This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 828bdd015e7fbe05baa6f19ffc9183049fcd1246
Author: Leonard Xu <[email protected]>
AuthorDate: Fri Nov 20 20:06:28 2020 +0800

    [FLINK-19082][doc] Add docs for temporal table and temporal table join
---
 docs/dev/table/sql/queries.md                  |  50 ++--
 docs/dev/table/sql/queries.zh.md               |  50 ++--
 docs/dev/table/streaming/joins.md              | 393 ++++++++++++++-----------
 docs/dev/table/streaming/joins.zh.md           | 363 ++++++++++++-----------
 docs/dev/table/streaming/temporal_tables.md    | 380 ++++++++++++++----------
 docs/dev/table/streaming/temporal_tables.zh.md | 365 +++++++++++++----------
 6 files changed, 908 insertions(+), 693 deletions(-)

diff --git a/docs/dev/table/sql/queries.md b/docs/dev/table/sql/queries.md
index 215d99f..53f3778 100644
--- a/docs/dev/table/sql/queries.md
+++ b/docs/dev/table/sql/queries.md
@@ -722,6 +722,31 @@ FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) ON 
TRUE
     </tr>
     <tr>
       <td>
+        <strong>Join with Temporal Table</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
+      </td>
+      <td>
+        <p><a href="{{ site.baseurl 
}}/dev/table/streaming/temporal_tables.html">Temporal Tables</a> are tables 
that track changes over time.
+        A <a href="{{ site.baseurl 
}}/dev/table/streaming/temporal_tables.html#temporal-table">Temporal Table</a> 
provides access to the versions of a temporal table at a specific point in 
time.</p>
+
+        <p>Processing-time temporal join and event-time temporal join are 
supported, inner join and left join are supported.</p>
+        <p>The event-time temporal join is not suppored in <span class="label 
label-primary">Batch</span></p>
+        <p>The following example assumes that <strong>LatestRates</strong> is 
a <a href="{{ site.baseurl 
}}/dev/table/streaming/temporal_tables.html#temporal-table">Temporal Table</a> 
which is materialized with the latest rate.</p>
+{% highlight sql %}
+SELECT
+  o.amount, o.currency, r.rate, o.amount * r.rate
+FROM
+  Orders AS o
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
+  ON r.currency = o.currency
+{% endhighlight %}
+        <p>The RHS table can be named with an alias using optional clause 
<code>[[<strong>AS</strong>] correlationName]</code>, note that the 
<code><strong>AS</strong></code> keyword is also optional.</p>
+        <p>For more information please check the more detailed <a href="{{ 
site.baseurl }}/dev/table/streaming/temporal_tables.html">Temporal Tables</a> 
concept description.</p>
+        <p>Only supported in Blink planner.</p>
+      </td>
+    </tr>    
+    <tr>
+      <td>
         <strong>Join with Temporal Table Function</strong><br>
         <span class="label label-primary">Streaming</span>
       </td>
@@ -745,31 +770,6 @@ WHERE
         <p>For more information please check the more detailed <a href="{{ 
site.baseurl }}/dev/table/streaming/temporal_tables.html">temporal tables 
concept description</a>.</p>
       </td>
     </tr>
-    <tr>
-      <td>
-        <strong>Join with Temporal Table</strong><br>
-        <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
-      </td>
-      <td>
-        <p><a href="{{ site.baseurl 
}}/dev/table/streaming/temporal_tables.html">Temporal Tables</a> are tables 
that track changes over time.
-        A <a href="{{ site.baseurl 
}}/dev/table/streaming/temporal_tables.html#temporal-table">Temporal Table</a> 
provides access to the versions of a temporal table at a specific point in 
time.</p>
-
-        <p>Only inner and left joins with processing-time temporal tables are 
supported.</p>
-        <p>The following example assumes that <strong>LatestRates</strong> is 
a <a href="{{ site.baseurl 
}}/dev/table/streaming/temporal_tables.html#temporal-table">Temporal Table</a> 
which is materialized with the latest rate.</p>
-{% highlight sql %}
-SELECT
-  o.amout, o.currency, r.rate, o.amount * r.rate
-FROM
-  Orders AS o
-  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
-  ON r.currency = o.currency
-{% endhighlight %}
-        <p>The RHS table can be named with an alias using optional clause 
<code>[[<strong>AS</strong>] correlationName]</code>, note that the 
<code><strong>AS</strong></code> keyword is also optional.</p>
-        <p>For more information please check the more detailed <a href="{{ 
site.baseurl }}/dev/table/streaming/temporal_tables.html">Temporal Tables</a> 
concept description.</p>
-        <p>Only supported in Blink planner.</p>
-      </td>
-    </tr>
-
   </tbody>
 </table>
 </div>
diff --git a/docs/dev/table/sql/queries.zh.md b/docs/dev/table/sql/queries.zh.md
index a626fc6..2ee73eb 100644
--- a/docs/dev/table/sql/queries.zh.md
+++ b/docs/dev/table/sql/queries.zh.md
@@ -721,6 +721,31 @@ FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) AS 
t(tag) ON TRUE
     </tr>
     <tr>
       <td>
+        <strong>Join Temporal Tables </strong><br>
+        <span class="label label-primary">批处理</span> <span class="label 
label-primary">流处理</span>
+      </td>
+      <td>
+        <p><a href="{{ site.baseurl 
}}/zh/dev/table/streaming/temporal_tables.html">Temporal Tables</a> 是随时间变化而变化的表。
+        <a href="{{ site.baseurl 
}}/zh/dev/table/streaming/temporal_tables.html#temporal-table">Temporal 
Table</a> 提供访问指定时间点的 temporal table 版本的功能。</p>
+
+        <p>支持基于处理时间 或 基于事件时间 的 temporal table join, 支持 inner 和 left join。 </p>
+        <p>基于事件时间的 temporal table join 在 <span class="label 
label-primary">批处理</span> 中暂不支持。</p>
+        <p>下述示例中,假设 <strong>LatestRates</strong> 是一个根据最新的 rates 物化的 <a 
href="{{ site.baseurl 
}}/zh/dev/table/streaming/temporal_tables.html#temporal-table">Temporal 
Table</a> 。</p>
+{% highlight sql %}
+SELECT
+  o.amount, o.currency, r.rate, o.amount * r.rate
+FROM
+  Orders AS o
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
+  ON r.currency = o.currency
+{% endhighlight %}
+        <p>Join 的右表可以使用可选表达式 <code>[[<strong>AS</strong>] 
correlationName]</code> 取别名,注意 <code><strong>AS</strong></code> 关键词也是可选的。</p>
+        <p>请阅读 <a href="{{ site.baseurl 
}}/zh/dev/table/streaming/temporal_tables.html">Temporal Tables</a> 
概念描述以了解详细信息。</p>
+        <p>仅 Blink planner 支持。</p>
+      </td>
+    </tr>    
+    <tr>
+      <td>
         <strong>Join Temporal Table Function</strong><br>
         <span class="label label-primary">流处理</span>
       </td>
@@ -744,31 +769,6 @@ WHERE
         <p>请查看 <a href="{{ site.baseurl 
}}/zh/dev/table/streaming/temporal_tables.html"> Temporal Tables 概念描述</a> 
以了解详细信息。</p>
       </td>
     </tr>
-    <tr>
-      <td>
-        <strong>Join Temporal Tables </strong><br>
-        <span class="label label-primary">批处理</span> <span class="label 
label-primary">流处理</span>
-      </td>
-      <td>
-        <p><a href="{{ site.baseurl 
}}/zh/dev/table/streaming/temporal_tables.html">Temporal Tables</a> 是随时间变化而变化的表。
-        <a href="{{ site.baseurl 
}}/zh/dev/table/streaming/temporal_tables.html#temporal-table">Temporal 
Table</a> 提供访问指定时间点的 temporal table 版本的功能。</p>
-
-        <p>仅支持带有处理时间的 temporal tables 的 inner 和 left join。</p>
-        <p>下述示例中,假设 <strong>LatestRates</strong> 是一个根据最新的 rates 物化的 <a 
href="{{ site.baseurl 
}}/zh/dev/table/streaming/temporal_tables.html#temporal-table">Temporal 
Table</a> 。</p>
-{% highlight sql %}
-SELECT
-  o.amout, o.currency, r.rate, o.amount * r.rate
-FROM
-  Orders AS o
-  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
-  ON r.currency = o.currency
-{% endhighlight %}
-        <p>Join 的右表可以使用可选表达式 <code>[[<strong>AS</strong>] 
correlationName]</code> 取别名,注意 <code><strong>AS</strong></code> 关键词也是可选的。</p>
-        <p>请阅读 <a href="{{ site.baseurl 
}}/zh/dev/table/streaming/temporal_tables.html">Temporal Tables</a> 
概念描述以了解详细信息。</p>
-        <p>仅 Blink planner 支持。</p>
-      </td>
-    </tr>
-
   </tbody>
 </table>
 </div>
diff --git a/docs/dev/table/streaming/joins.md 
b/docs/dev/table/streaming/joins.md
index 98c96e6..6d6f340 100644
--- a/docs/dev/table/streaming/joins.md
+++ b/docs/dev/table/streaming/joins.md
@@ -49,7 +49,7 @@ However, this operation has an important implication: it 
requires to keep both s
 Thus, the resource usage will grow indefinitely as well, if one or both input 
tables are continuously growing.
 
 Interval Joins
--------------------
+--------------
 
 A interval join is defined by a join predicate, that checks if the [time 
attributes](time_attributes.html) of the input
 records are within certain time constraints, i.e., a time window.
@@ -65,174 +65,147 @@ WHERE o.id = s.orderId AND
 
 Compared to a regular join operation, this kind of join only supports 
append-only tables with time attributes. Since time attributes are 
quasi-monotonic increasing, Flink can remove old values from its state without 
affecting the correctness of the result.
 
-Join with a Temporal Table Function
---------------------------
-
-A join with a temporal table function joins an append-only table (left 
input/probe side) with a temporal table (right input/build side),
-i.e., a table that changes over time and tracks its changes. Please check the 
corresponding page for more information about [temporal 
tables](temporal_tables.html).
+Temporal Joins
+--------------
+<span class="label label-danger">Attention</span> This feature is only 
supported in Blink planner.
+<span class="label label-danger">Attention</span> Attention it is only 
supported in SQL, and not supported in Table API yet.
 
-The following example shows an append-only table `Orders` that should be 
joined with the continuously changing currency rates table `RatesHistory`.
+Temporal join is an arbitrary table (left input/probe side) correlate with the 
versions of temporal table (right input/build side), The temporal table can be 
an external dimension table that changes over time 
+or a changelog that tracks all history changes. Please check the corresponding 
page for more information about [temporal tables](temporal_tables.html).
 
-`Orders` is an append-only table that represents payments for the given 
`amount` and the given `currency`.
-For example at `10:15` there was an order for an amount of `2 Euro`.
+Flink uses the SQL syntax of `FOR SYSTEM_TIME AS OF` to query temporal table, 
which is proposed in SQL:2011 standard. The syntax of a temporal table join is 
as follows:
 
 {% highlight sql %}
-SELECT * FROM Orders;
-
-rowtime amount currency
-======= ====== =========
-10:15        2 Euro
-10:30        1 US Dollar
-10:32       50 Yen
-10:52        3 Euro
-11:04        5 US Dollar
+SELECT [column_list]
+FROM table1 [AS <alias1>]
+[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS 
<alias2>]
+ON table1.column-name1 = table2.column-name1
 {% endhighlight %}
 
-`RatesHistory` represents an ever changing append-only table of currency 
exchange rates with respect to `Yen` (which has a rate of `1`).
-For example, the exchange rate for the period from `09:00` to `10:45` of 
`Euro` to `Yen` was `114`. From `10:45` to `11:15` it was `116`.
+### Event-time Temporal Joins
 
-{% highlight sql %}
-SELECT * FROM RatesHistory;
+Event-time temporal join uses the left input tables event time attribute to 
correlate the corresponding version of [versioned 
table](temporal_tables.html#defining-versioned-table).
+Event-time temporal join only supports versioned tables and the versioned 
tables need to be a changelog stream. However, an append-only stream can be 
converted to a changelog stream in Flink, thus the versioned table can come 
from an append-only stream,
+please see [versioned view](temporal_tables.html#defining-versioned-view) for 
more information to know how to define a versioned table from an append-only 
stream.
 
-rowtime currency   rate
-======= ======== ======
-09:00   US Dollar   102
-09:00   Euro        114
-09:00   Yen           1
-10:45   Euro        116
-11:15   Euro        119
-11:49   Pounds      108
-{% endhighlight %}
-
-Given that we would like to calculate the amount of all `Orders` converted to 
a common currency (`Yen`).
+With an event-time time attribute (i.e., a rowtime attribute), it is possible 
to use a _past_ time attribute with the temporal table.
+This allows for joining the two tables at a common point in time. Compared to 
processing-time temporal joins, the temporal table does not only keep the 
latest version of
+the build side records in the state but stores all versions (identified by 
time) since the last watermark.
 
-For example, we would like to convert the following order using the 
appropriate conversion rate for the given `rowtime` (`114`).
+For example, an incoming row with an event-time timestamp of `12:30:00` that 
is appended to the probe side table
+is joined with the version of the build side table at time `12:30:00` 
according to the [concept of temporal tables](temporal_tables.html).
+Thus, the incoming row is only joined with rows that have a timestamp lower or 
equal to `12:30:00` with
+applied updates according to the primary key until this point in time.
 
-{% highlight text %}
-rowtime amount currency
-======= ====== =========
-10:15        2 Euro
-{% endhighlight %}
+By definition of event time, [watermarks]({{ site.baseurl 
}}/dev/event_time.html) allow the join operation to move
+forward in time and discard versions of the build table that are no longer 
necessary because no incoming row with
+lower or equal timestamp is expected.
 
-Without using the concept of [temporal tables](temporal_tables.html), one 
would need to write a query like:
+The following event-time temporal table join example shows an append-only 
table `orders` that should be joined with the  `product_changelog` which comes 
from the changelog of the database table `products`, the product price in table 
`products` is changing over time. 
 
 {% highlight sql %}
-SELECT
-  SUM(o.amount * r.rate) AS amount
-FROM Orders AS o,
-  RatesHistory AS r
-WHERE r.currency = o.currency
-AND r.rowtime = (
-  SELECT MAX(rowtime)
-  FROM RatesHistory AS r2
-  WHERE r2.currency = o.currency
-  AND r2.rowtime <= o.rowtime);
+SELECT * FROM product_changelog;
+
+(changelog kind)  update_time product_name price
+================= =========== ============ ===== 
++(INSERT)         00:01:00    scooter      11.11
++(INSERT)         00:02:00    basketball   23.11
+-(UPDATE_BEFORE)  12:00:00    scooter      11.11
++(UPDATE_AFTER)   12:00:00    scooter      12.99  <= the price of `scooter` 
increased to `12.99` at `12:00:00`
+-(UPDATE_BEFORE)  12:00:00    basketball   23.11 
++(UPDATE_AFTER)   12:00:00    basketball   19.99  <= the price of `basketball` 
decreased to `19.99` at `12:00:00`
+-(DELETE)         18:00:00    scooter      12.99  <= the product `basketball` 
is deleted at `18:00:00`
 {% endhighlight %}
 
-With the help of a temporal table function `Rates` over `RatesHistory`, we can 
express such a query in SQL as:
-
+Given that we would like to output the version of `product_changelog` table of 
the time `10:00:00`, the following table shows the result. 
 {% highlight sql %}
-SELECT
-  o.amount * r.rate AS amount
-FROM
-  Orders AS o,
-  LATERAL TABLE (Rates(o.rowtime)) AS r
-WHERE r.currency = o.currency
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+00:01:00     p_001      scooter      11.11
+00:02:00     p_002      basketball   23.11
 {% endhighlight %}
 
-Each record from the probe side will be joined with the version of the build 
side table at the time of the correlated time attribute of the probe side 
record.
-In order to support updates (overwrites) of previous values on the build side 
table, the table must define a primary key.
-
-In our example, each record from `Orders` will be joined with the version of 
`Rates` at time `o.rowtime`. The `currency` field has been defined as the 
primary key of `Rates` before and is used to connect both tables in our 
example. If the query were using a processing-time notion, a newly appended 
order would always be joined with the most recent version of `Rates` when 
executing the operation.
-
-In contrast to [regular joins](#regular-joins), this means that if there is a 
new record on the build side, it will not affect the previous results of the 
join.
-This again allows Flink to limit the number of elements that must be kept in 
the state.
-
-Compared to [interval joins](#interval-joins), temporal table joins do not 
define a time window within which bounds the records will be joined.
-Records from the probe side are always joined with the build side's version at 
the time specified by the time attribute. Thus, records on the build side might 
be arbitrarily old.
-As time passes, the previous and no longer needed versions of the record (for 
the given primary key) will be removed from the state.
-
-Such behaviour makes a temporal table join a good candidate to express stream 
enrichment in relational terms.
-
-### Usage
+Given that we would like to output the version of `product_changelog` table of 
the time `13:00:00`, the following table shows the result. 
+{% highlight sql %}
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+12:00:00     p_001      scooter      12.99
+12:00:00     p_002      basketball   19.99
+{% endhighlight %}
 
-After [defining temporal table 
function](temporal_tables.html#defining-temporal-table-function), we can start 
using it.
-Temporal table functions can be used in the same way as normal table functions 
would be used.
+With the help of event-time temporal table join, we can join different version 
of versioned table.
 
-The following code snippet solves our motivating problem of converting 
currencies from the `Orders` table:
-
-<div class="codetabs" markdown="1">
-<div data-lang="SQL" markdown="1">
 {% highlight sql %}
+CREATE TABLE orders (
+  order_id STRING,
+  product_id STRING,
+  order_time TIMESTAMP(3),
+  WATERMARK FOR order_time AS order_time  -- defines the necessary event time
+) WITH (
+...
+);
+
+-- Set the session time zone to UTC, the database operation time of changelog 
stored in epoch milliseconds
+-- Flink SQL will use the session time zone when convert the changelog time 
from milliseconds to timestamp
+-- Thus, please set proper timezone according to your database operation time 
in changelog.
+SET table.local-time-zone=UTC;
+
+-- Define a versioned table
+CREATE TABLE product_changelog (
+  product_id STRING,
+  product_name STRING,
+  product_price DECIMAL(10, 4),
+  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- 
Note: Automatically convert the changelog time milliseconds to timestamp
+  PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) defines the primary key 
constraint
+  WATERMARK FOR update_time AS update_time   -- (2) defines the event time by 
watermark                               
+) WITH (
+  'connector' = 'kafka', 
+  'topic' = 'products',
+  'scan.startup.mode' = 'earliest-offset',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'value.format' = 'debezium-json'
+);
+
+-- Do event-time temporal join
 SELECT
-  SUM(o_amount * r_rate) AS amount
-FROM
-  Orders,
-  LATERAL TABLE (Rates(o_proctime))
-WHERE
-  r_currency = o_currency
-{% endhighlight %}
-</div>
-<div data-lang="java" markdown="1">
-{% highlight java %}
-Table result = orders
-    .joinLateral("rates(o_proctime)", "o_currency = r_currency")
-    .select("(o_amount * r_rate).sum as amount");
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val result = orders
-    .joinLateral(rates('o_proctime), 'r_currency === 'o_currency)
-    .select(('o_amount * 'r_rate).sum as 'amount)
+  order_id,
+  order_time,
+  product_name,
+  product_time,
+  price
+FROM orders AS O
+LEFT JOIN product_changelog FOR SYSTEM_TIME AS OF O.order_time AS P
+ON O.product_id = P.product_id;
+
+order_id order_time product_name product_time price
+======== ========== ============ ============ =====
+o_001    00:01:00   scooter      00:01:00     11.11
+o_002    00:03:00   basketball   00:02:00     23.11
+o_003    12:00:00   scooter      12:00:00     12.99
+o_004    12:00:00   basketball   12:00:00     19.99
+o_005    18:00:00   NULL         NULL         NULL
 {% endhighlight %}
-</div>
-</div>
 
-**Note**: State retention defined in a [query 
configuration](query_configuration.html) is not yet implemented for temporal 
joins.
-This means that the required state to compute the query result might grow 
infinitely depending on the number of distinct primary keys for the history 
table.
+The event-time temporal join is usually used to enrich the stream with 
changelog stream.
+
+**Note**: The event-time temporal join is triggered by watermark from left 
side and right side, please ensure the both sides of the join have set 
watermark properly. 
+
+**Note**: The event-time temporal join requires the primary key must be 
contained in the equivalence condition of temporal join condition, e.g. The 
primary key `P.product_id` of table `product_changelog` must be contained in 
condition `O.product_id = P.product_id`.
 
 ### Processing-time Temporal Joins
 
-With a processing-time time attribute, it is impossible to pass _past_ time 
attributes as an argument to the temporal table function.
-By definition, it is always the current timestamp. Thus, invocations of a 
processing-time temporal table function will always return the latest known 
versions of the underlying table
-and any updates in the underlying history table will also immediately 
overwrite the current values.
+Processing-time temporal join uses the left input tables processing time 
attribute to correlate the latest version of a [regular 
table](temporal_tables.html#defining-regular-table).
+Processing-time temporal join only supports regular tables currently, and the 
supported regular table can only contain append-only stream. 
 
-Only the latest versions (with respect to the defined primary key) of the 
build side records are kept in the state.
-Updates of the build side will have no effect on previously emitted join 
results.
+With a processing-time time attribute, it is impossible to use a _past_ time 
attribute as an argument to the temporal table.
+By definition, it is always the current timestamp. Thus, invocations of 
correlating temporal table will always return the latest known versions of the 
underlying table and any updates in the underlying history table will also 
immediately overwrite the current values.
 
 One can think about a processing-time temporal join as a simple `HashMap<K, 
V>` that stores all of the records from the build side.
 When a new record from the build side has the same key as some previous 
record, the old value is just simply overwritten.
 Every record from the probe side is always evaluated against the most 
recent/current state of the `HashMap`.
 
-### Event-time Temporal Joins
-
-With an event-time time attribute (i.e., a rowtime attribute), it is possible 
to pass _past_ time attributes to the temporal table function.
-This allows for joining the two tables at a common point in time.
-
-Compared to processing-time temporal joins, the temporal table does not only 
keep the latest version (with respect to the defined primary key) of the build 
side records in the state
-but stores all versions (identified by time) since the last watermark.
-
-For example, an incoming row with an event-time timestamp of `12:30:00` that 
is appended to the probe side table
-is joined with the version of the build side table at time `12:30:00` 
according to the [concept of temporal tables](temporal_tables.html).
-Thus, the incoming row is only joined with rows that have a timestamp lower or 
equal to `12:30:00` with
-applied updates according to the primary key until this point in time.
-
-By definition of event time, [watermarks]({{ site.baseurl 
}}/dev/event_time.html) allow the join operation to move
-forward in time and discard versions of the build table that are no longer 
necessary because no incoming row with
-lower or equal timestamp is expected.
-
-Join with a Temporal Table
---------------------------
-
-A join with a temporal table joins an arbitrary table (left input/probe side) 
with a temporal table (right input/build side),
-i.e., an external dimension table that changes over time. Please check the 
corresponding page for more information about [temporal 
tables](temporal_tables.html#temporal-table).
-
-<span class="label label-danger">Attention</span> Users can not use arbitrary 
tables as a temporal table, but need to use a table backed by a 
`LookupableTableSource`. A `LookupableTableSource` can only be used for 
temporal join as a temporal table. See the page for more details about [how to 
define 
LookupableTableSource](../sourceSinks.html#defining-a-tablesource-with-lookupable).
-
-The following example shows an `Orders` stream that should be joined with the 
continuously changing currency rates table `LatestRates`.
-
-`LatestRates` is a dimension table that is materialized with the latest rate. 
At time `10:15`, `10:30`, `10:52`, the content of `LatestRates` looks as 
follows:
+The following processing-time temporal table join example shows an append-only 
table `orders` that should be joined with the table `LatestRates`,
+`LatestRates` is a dimension table (e.g. HBase table) that is materialized 
with the latest rate. At time `10:15`, `10:30`, `10:52`, the content of 
`LatestRates` looks as follows:
 
 {% highlight sql %}
 10:15> SELECT * FROM LatestRates;
@@ -251,7 +224,6 @@ US Dollar   102
 Euro        114
 Yen           1
 
-
 10:52> SELECT * FROM LatestRates;
 
 currency   rate
@@ -263,8 +235,7 @@ Yen           1
 
 The content of `LastestRates` at time `10:15` and `10:30` are equal. The Euro 
rate has changed from 114 to 116 at `10:52`.
 
-`Orders` is an append-only table that represents payments for the given 
`amount` and the given `currency`.
-For example at `10:15` there was an order for an amount of `2 Euro`.
+`Orders` is an append-only table that represents payments for the given 
`amount` and the given `currency`. For example at `10:15` there was an order 
for an amount of `2 Euro`.
 
 {% highlight sql %}
 SELECT * FROM Orders;
@@ -300,37 +271,118 @@ FROM
   ON r.currency = o.currency
 {% endhighlight %}
 
-Each record from the probe side will be joined with the current version of the 
build side table. In our example, the query is using the processing-time 
notion, so a newly appended order would always be joined with the most recent 
version of `LatestRates` when executing the operation. Note that the result is 
not deterministic for processing-time.
+Each record from the probe side will be joined with the current version of the 
build side table. In our example, the query is using the processing-time 
notion, so a newly appended order would always be joined with the most recent 
version of `LatestRates` when executing the operation.
+Note that the result is not deterministic for processing-time.
+The processing-time temporal join is usually used to enrich the stream with 
external table (i.e. dimension table).
 
-In contrast to [regular joins](#regular-joins), the previous results of the 
temporal table join will not be affected despite the changes on the build side. 
Also, the temporal table join operator is very lightweight and does not keep 
any state.
+In contrast to [regular joins](#regular-joins), the previous results of the 
temporal table join will not be affected despite the changes on the build side.
+* For event-time temporal join, the temporal join operator keeps both left 
table state and right table state and clean up the state by watermark.
+* For processing-time temporal join, the temporal join operator keeps only 
right table state and the data in right state only contains the latest version, 
the state is lightweight; for temporal table that
+ has the ability to lookup external system at runtime, the temporal join 
operator does not need to keep any state, the temporal table join operator is 
very lightweight.
 
 Compared to [interval joins](#interval-joins), temporal table joins do not 
define a time window within which the records will be joined.
-Records from the probe side are always joined with the build side's latest 
version at processing time. Thus, records on the build side might be 
arbitrarily old.
+Records from the probe side are always joined with the build side's version at 
the time specified by the time attribute. Thus, records on the build side might 
be arbitrarily old.
+As time passes, the previous and no longer needed versions of the record (for 
the given primary key) will be removed from the state.
 
-Both [temporal table function join](#join-with-a-temporal-table-function) and 
temporal table join come from the same motivation but have different SQL syntax 
and runtime implementations:
-* The SQL syntax of the temporal table function join is a join UDTF, while the 
temporal table join uses the standard temporal table syntax introduced in 
SQL:2011.
-* The implementation of temporal table function joins actually joins two 
streams and keeps them in state, while temporal table joins just receive the 
only input stream and look up the external database according to the key in the 
record.
-* The temporal table function join is usually used to join a changelog stream, 
while the temporal table join is usually used to join an external table (i.e. 
dimension table).
+Join with a Temporal Table Function
+--------------------------
+
+A join with a temporal table function joins an append-only table (left 
input/probe side) with a temporal table (right input/build side),
+i.e., a table that changes over time and tracks its changes. Please check the 
corresponding page for more information about [temporal 
tables](temporal_tables.html).
 
-Such behaviour makes a temporal table join a good candidate to express stream 
enrichment in relational terms.
+The following example shows an append-only table `Orders` that should be 
joined with the continuously changing currency rates table `RatesHistory`.
 
-In the future, the temporal table join will support the features of temporal 
table function joins, i.e. support to temporal join a changelog stream.
+`Orders` is an append-only table that represents payments for the given 
`amount` and the given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
 
-### Usage
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+======= ====== =========
+10:15        2 Euro
+10:30        1 US Dollar
+10:32       50 Yen
+10:52        3 Euro
+11:04        5 US Dollar
+{% endhighlight %}
 
-The syntax of temporal table join is as follows:
+`RatesHistory` represents an ever changing append-only table of currency 
exchange rates with respect to `Yen` (which has a rate of `1`).
+For example, the exchange rate for the period from `09:00` to `10:45` of 
`Euro` to `Yen` was `114`. From `10:45` to `11:15` it was `116`.
 
 {% highlight sql %}
-SELECT [column_list]
-FROM table1 [AS <alias1>]
-[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
-ON table1.column-name1 = table2.column-name1
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+======= ======== ======
+09:00   US Dollar   102
+09:00   Euro        114
+09:00   Yen           1
+10:45   Euro        116
+11:15   Euro        119
+11:49   Pounds      108
+{% endhighlight %}
+
+Given that we would like to calculate the amount of all `Orders` converted to 
a common currency (`Yen`).
+
+For example, we would like to convert the following order using the 
appropriate conversion rate for the given `rowtime` (`114`).
+
+{% highlight text %}
+rowtime amount currency
+======= ====== =========
+10:15        2 Euro
+{% endhighlight %}
+
+Without using the concept of [temporal tables](temporal_tables.html), one 
would need to write a query like:
+
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM RatesHistory AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+
+With the help of a temporal table function `Rates` over `RatesHistory`, we can 
express such a query in SQL as:
+
+{% highlight sql %}
+SELECT
+  o.amount * r.rate AS amount
+FROM
+  Orders AS o,
+  LATERAL TABLE (Rates(o.rowtime)) AS r
+WHERE r.currency = o.currency
 {% endhighlight %}
 
-Currently, only support INNER JOIN and LEFT JOIN. The `FOR SYSTEM_TIME AS OF 
table1.proctime` should be followed after temporal table. `proctime` is a 
[processing time attribute](time_attributes.html#processing-time) of `table1`.
-This means that it takes a snapshot of the temporal table at processing time 
when joining every record from left table.
+Each record from the probe side will be joined with the version of the build 
side table at the time of the correlated time attribute of the probe side 
record.
+In order to support updates (overwrites) of previous values on the build side 
table, the table must define a primary key.
+
+In our example, each record from `Orders` will be joined with the version of 
`Rates` at time `o.rowtime`. The `currency` field has been defined as the 
primary key of `Rates` before and is used to connect both tables in our 
example. If the query were using a processing-time notion, a newly appended 
order would always be joined with the most recent version of `Rates` when 
executing the operation.
+
+Both [temporal table function join](#join-with-a-temporal-table-function) and 
[temporal join](#temporal-joins) come from the same motivation but have 
different SQL syntax and runtime implementations:
+* The SQL syntax of the temporal table function join is a join UDTF, while the 
temporal table join uses the standard temporal table syntax introduced in 
SQL:2011.
+* The feature of temporal table function is the subset of temporal table join, 
and they share some operator implementations, the temporal table function is 
legacy way, and event-time temporal table join
+is supported since Flink 1.12. 
+* The implementation of temporal table function joins actually joins two 
streams and keeps them in state, while temporal table join supports another 
runtime implementation besides this way, 
+i.e.: processing-time temporal table join can keep nothing in state and only 
receive left input stream and then look up the external database according to 
the key in the record.
+* The temporal table function supports both in legacy planner and Blink 
planner, but the temporal table join only supports in Blink planner, the legacy 
planner will be deprecated in the future. 
+
+**Note:** The semantics is problematic both for both processing-time temporal 
table function and processing-time temporal table join that implements by 
keeping two stream in state, temporal table function enable this function, 
+but temporal table join disable this function. The reason is the join 
processing for left stream doesn't wait for the complete snapshot of temporal 
table, the left stream may not found the expected dimension data, this may 
mislead users in production environment, 
+
+To get the complete snapshot of temporal table may need introduce new 
mechanism in Flink SQL in the future.
+
+### Usage
 
-For example, after [defining temporal 
table](temporal_tables.html#defining-temporal-table), we can use it as 
following.
+After [defining temporal table 
function](temporal_tables.html#defining-temporal-table-function), we can start 
using it.
+Temporal table functions can be used in the same way as normal table functions 
would be used.
+
+The following code snippet solves our motivating problem of converting 
currencies from the `Orders` table:
 
 <div class="codetabs" markdown="1">
 <div data-lang="SQL" markdown="1">
@@ -338,17 +390,26 @@ For example, after [defining temporal 
table](temporal_tables.html#defining-tempo
 SELECT
   SUM(o_amount * r_rate) AS amount
 FROM
-  Orders
-  JOIN LatestRates FOR SYSTEM_TIME AS OF o_proctime
-  ON r_currency = o_currency
+  Orders,
+  LATERAL TABLE (Rates(o_proctime))
+WHERE
+  r_currency = o_currency
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Table result = orders
+    .joinLateral("rates(o_proctime)", "o_currency = r_currency")
+    .select("(o_amount * r_rate).sum as amount");
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val result = orders
+    .joinLateral(rates('o_proctime), 'r_currency === 'o_currency)
+    .select(('o_amount * 'r_rate).sum as 'amount)
 {% endhighlight %}
 </div>
 </div>
-
-<span class="label label-danger">Attention</span> It is only supported in 
Blink planner.
-
-<span class="label label-danger">Attention</span> It is only supported in SQL, 
and not supported in Table API yet.
-
-<span class="label label-danger">Attention</span> Flink does not support event 
time temporal table joins currently.
 
 {% top %}
diff --git a/docs/dev/table/streaming/joins.zh.md 
b/docs/dev/table/streaming/joins.zh.md
index 0bf591f..f5dd6c9 100644
--- a/docs/dev/table/streaming/joins.zh.md
+++ b/docs/dev/table/streaming/joins.zh.md
@@ -68,6 +68,189 @@ WHERE o.id = s.orderId AND
 
 <a name="join-with-a-temporal-table-function"></a>
 
+时态表 Join
+--------------------------
+<span class="label label-danger">注意</span> 只在 Blink planner 中支持。
+<span class="label label-danger">注意</span> 只在 SQL 中支持,Table API 暂不支持。
+
+时态表 Join 意味着对任意表(左输入/探针侧)去关联一个时态表(右输入/构建侧)的版本,时态表可以是一张跟踪所有变更记录的表(例如数据库表的 
changelog,包含多个表快照),也可以是物化所有变更之后的表(例如数据库表,只有最新表快照)。
+请参考[时态表](temporal_tables.html)页面以获取更多信息。
+
+Flink 使用了 SQL:2011 标准引入的时态表 Join 语法,时态表 Join 的语法如下:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM table1 [AS <alias1>]
+[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS 
<alias2>]
+ON table1.column-name1 = table2.column-name1
+{% endhighlight %}
+ 
+<a name="processing-time-temporal-joins"></a>
+
+### 基于事件时间的时态 Join
+基于事件时间的时态表 join 使用(左侧输入/探针侧) 的 事件时间 去关联(右侧输入/构建侧) 
[版本表](temporal_tables.html#声明版本表) 对应的版本。
+基于事件时间的时态表 join 仅支持关版本表或版本视图,版本表或版本视图只能是一个 changelog 流。 但是,Flink 支持将 
append-only 流转换成 changelog 流,因此版本表也可以来自一个 append-only 流。
+查看[声明版本视图](temporal_tables.html#声明版本视图) 获取更多的信息关于如何声明一张来自 append-only 流的版本表。
+
+将事件时间作为时间属性时,可将 _过去_ 时间属性与时态表一起使用。这允许对两个表中在相同时间点的记录执行 Join 操作。
+与基于处理时间的时态 Join 相比,时态表不仅将构建侧记录的最新版本(是否最新由所定义的主键所决定)保存在 state 中,同时也会存储自上一个 
watermarks 以来的所有版本(按时间区分)。
+
+例如,在探针侧表新插入一条事件时间时间为 `12:30:00` 的记录,它将和构建侧表时间点为 `12:30:00` 
的版本根据[时态表的概念](temporal_tables.html)进行 Join 运算。
+因此,新插入的记录仅与时间戳小于等于 `12:30:00` 的记录进行 Join 计算(由主键决定哪些时间点的数据将参与计算)。
+
+通过定义事件时间,[watermarks]({{ site.baseurl }}/dev/event_time.html) 允许 Join 
运算不断向前滚动,丢弃不再需要的构建侧快照。因为不再需要时间戳更低或相等的记录。
+
+下面的例子展示了订单流关联产品表这个场景举例,`orders` 表包含了来自 Kafka 的实时订单流,`product_changelog` 
表来自数据库表 `products` 的 changelog , 产品的价格在数据库表 `products` 中是随时间实时变化的。
+
+{% highlight sql %}
+SELECT * FROM product_changelog;
+
+(changelog kind)  update_time product_name price
+================= =========== ============ ===== 
++(INSERT)         00:01:00    scooter      11.11
++(INSERT)         00:02:00    basketball   23.11
+-(UPDATE_BEFORE)  12:00:00    scooter      11.11
++(UPDATE_AFTER)   12:00:00    scooter      12.99  <= 产品 `scooter` 在 `12:00:00` 
时涨价到了 `12.99`
+-(UPDATE_BEFORE)  12:00:00    basketball   23.11 
++(UPDATE_AFTER)   12:00:00    basketball   19.99  <= 产品 `basketball` 在 
`12:00:00` 时降价到了 `19.99`
+-(DELETE)         18:00:00    scooter      12.99  <= 产品 `scooter` 在 `18:00:00` 
从数据库表中删除
+{% endhighlight %}
+
+如果我们想输出 `product_changelog` 表在 `10:00:00` 对应的版本,表的内容如下所示:
+{% highlight sql %}
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+00:01:00     p_001      scooter      11.11
+00:02:00     p_002      basketball   23.11
+{% endhighlight %}
+
+如果我们想输出 `product_changelog` 表在 `13:00:00` 对应的版本,表的内容如下所示:
+{% highlight sql %}
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+12:00:00     p_001      scooter      12.99
+12:00:00     p_002      basketball   19.99
+{% endhighlight %}
+
+通过基于事件时间的时态表 join, 我们可以 join 上版本表中的不同版本:
+{% highlight sql %}
+CREATE TABLE orders (
+  order_id STRING,
+  product_id STRING,
+  order_time TIMESTAMP(3),
+  WATERMARK FOR order_time AS order_time  -- defines the necessary event time
+) WITH (
+...
+);
+
+-- 设置会话的时间区间, changelog 里的数据库操作时间是以 epoch 开始的毫秒数存储的,
+-- 在从毫秒转化为时间戳时,Flink SQL 会使用会话的时间区间
+-- 因此,请根据 changelog 中的数据库操作时间设置合适的时间区间
+SET table.local-time-zone=UTC;
+
+-- 声明一张版本表
+CREATE TABLE product_changelog (
+  product_id STRING,
+  product_name STRING,
+  product_price DECIMAL(10, 4),
+  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- 
注意:自动从毫秒数转为时间戳
+  PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) defines the primary key 
constraint
+  WATERMARK FOR update_time AS update_time   -- (2) defines the event time by 
watermark                               
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'products',
+  'scan.startup.mode' = 'earliest-offset',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'value.format' = 'debezium-json'
+);
+
+-- 基于事件时间的时态表 Join
+SELECT
+  order_id,
+  order_time,
+  product_name,
+  product_time,
+  price
+FROM orders AS O
+LEFT JOIN product_changelog FOR SYSTEM_TIME AS OF O.order_time AS P
+ON O.product_id = P.product_id;
+
+order_id order_time product_name product_time price
+======== ========== ============ ============ =====
+o_001    00:01:00   scooter      00:01:00     11.11
+o_002    00:03:00   basketball   00:02:00     23.11
+o_003    12:00:00   scooter      12:00:00     12.99
+o_004    12:00:00   basketball   12:00:00     19.99
+o_005    18:00:00   NULL         NULL         NULL
+{% endhighlight %}
+
+基于事件时间的时态表 Join 通常用在通过 changelog 丰富流上数据的场景。 
+
+**注意**: 基于事件时间的时态表 Join 是通过左右两侧的 watermark 触发,请确保为 join 两侧的表设置了合适的 watermark。
+
+**注意**: 基于事件时间的时态表 Join 的 join key 必须包含时态表的主键,例如:表 `product_changelog` 的主键 
`P.product_id` 必须包含在 join 条件 `O.product_id = P.product_id` 中。
+
+### 基于处理时间的时态 Join
+
+基于处理时间的时态表 join 使用任意表 (左侧输入/探针侧) 的 处理时间 去关联 (右侧输入/构建侧) 
[普通表](temporal_tables.html#声明普通表)的最新版本.
+基于处理时间的时态表 join 当前只支持关联普通表或普通视图,且支持普通表或普通视图当前只能是 append-only 流。
+
+如果将处理时间作为时间属性,_过去_ 时间属性将无法与时态表一起使用。根据定义,处理时间总会是当前时间戳。
+因此,关联时态表的调用将始终返回底层表的最新已知版本,并且底层表中的任何更新也将立即覆盖当前值。
+
+可以将处理时间的时态 Join 视作简单的 `HashMap <K,V>`,HashMap 中存储来自构建侧的所有记录。
+当来自构建侧的新插入的记录与旧值具有相同的 Key 时,旧值会被覆盖。
+探针侧的每条记录将总会根据 `HashMap` 的最新/当前状态来计算。
+
+接下来的示例展示了订单流 `Orders` 该如何与实时变化的汇率表 `Lates` 进行基于处理时间的时态 Join 操作,`LatestRates` 
总是表示 HBase 表 `Rates` 的最新内容。
+
+表 `LastestRates` 中的数据在时间点 `10:15` 和 `10:30` 时是相等的。欧元汇率在时间点 `10:52` 从 114 变化至 
116 。
+
+表 `Orders` 包含了金额字段 `amount` 和货币字段 `currency` 的支付记录数据。例如在 `10:15` 有一笔金额为 `2` 
欧元的订单记录。
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+amount currency
+====== =========
+     2 Euro             <== arrived at time 10:15
+     1 US Dollar        <== arrived at time 10:30
+     2 Euro             <== arrived at time 10:52
+{% endhighlight %}
+
+基于以上,我们想要计算所有 `Orders` 表的订单金额总和,并同时转换为对应成日元的金额。
+
+例如,我们想要以表 `LatestRates` 中的汇率将以下订单转换,则结果将为:
+
+{% highlight text %}
+amount currency     rate   amout*rate
+====== ========= ======= ============
+     2 Euro          114          228    <== arrived at time 10:15
+     1 US Dollar     102          102    <== arrived at time 10:30
+     2 Euro          116          232    <== arrived at time 10:52
+{% endhighlight %}
+
+通过时态表 Join,我们可以将上述操作表示为以下 SQL 查询:
+
+{% highlight sql %}
+SELECT
+  o.amout, o.currency, r.rate, o.amount * r.rate
+FROM
+  Orders AS o
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
+  ON r.currency = o.currency
+{% endhighlight %}
+
+探针侧表中的每个记录都将与构建侧表的当前版本所关联。 在此示例中,查询使用`处理时间`作为处理时间,因而新增订单将始终与表 `LatestRates` 
的最新汇率执行 Join 操作。注意,结果对于处理时间来说不是确定的。
+基于处理时间的时态表 Join 通常用在通过外部表(例如维度表)丰富流上数据的场景。 
+
+与[常规 Join](#常规-join) 相比,尽管构建侧表的数据发生了变化,但时态表 Join 的变化前结果不会随之变化。
+* 对于基于事件时间的时态 Join, join 算子保留 Join 两侧流的状态并通过 watermark 清理。 
+* 对于基于处理时间的时态 Join, join 算子保留仅保留右侧(构建侧)的状态,且构建侧的状态只包含数据的最新版本,右侧的状态是轻量级的; 
对于在运行时有能力查询外部系统的时态表,join 算子还可以优化成不保留任何状态,此时算子是非常轻量级的。
+
+与[时间区间 Join](#时间区间-join) 相比,时态表 Join 没有定义决定构建侧记录所属的将被 Join 时间窗口。
+探针侧的记录将总是与构建侧在对应`处理时间`的最新数据执行 Join,因而构建侧的数据可能是任意旧的。
+ 
 时态表函数 Join
 --------------------------
 
@@ -112,7 +295,7 @@ rowtime amount currency
 10:15        2 Euro
 {% endhighlight %}
 
-如果没有[时态表]({%link dev/table/streaming/temporal_tables.zh.md %})概念,则需要写一段这样的查询:
+如果没有[时态表](temporal_tables.html)概念,则需要写一段这样的查询:
 
 {% highlight sql %}
 SELECT
@@ -142,13 +325,23 @@ WHERE r.currency = o.currency
 
 在示例中,`Orders` 表中的每一条记录都与时间点 `o.rowtime` 的 `Rates` 进行 Join 运算。`currency` 
字段已被定义为 `Rates` 表的主键,在示例中该字段也被用于连接两个表。如果该查询采用的是处理时间,则在执行时新增的订单将始终与最新的 `Rates` 
执行 Join。
 
-与[常规 Join](#regular-joins) 相反,时态表函数 Join 意味着如果在构建侧新增一行记录将不会影响之前的结果。这同时使得 Flink 
能够限制必须保存在 state 中的元素数量(因为不再需要保存之前的状态)。
+与[常规 Join](#常规-join) 相反,时态表函数 Join 意味着如果在构建侧新增一行记录将不会影响之前的结果。这同时使得 Flink 
能够限制必须保存在 state 中的元素数量(因为不再需要保存之前的状态)。
 
-与[时间区间 Join](#interval-joins) 相比,时态表 Join 没有定义限制了每次参与 Join 
运算的元素的时间范围。探针侧的记录总是会和构建侧中对应特定时间属性的数据进行 Join 
操作。因而在构建侧的记录可以是任意时间之前的。随着时间流动,之前产生的和不再需要的给定 primary key 所对应的记录将从 state 中移除。
+与[时间区间 Join](#时间区间-join) 相比,时态表 Join 没有定义限制了每次参与 Join 
运算的元素的时间范围。探针侧的记录总是会和构建侧中对应特定时间属性的数据进行 Join 
操作。因而在构建侧的记录可以是任意时间之前的。随着时间流动,之前产生的和不再需要的给定 primary key 所对应的记录将从 state 中移除。
 
 这种做法让时态表 Join 成为一个很好的用于表达不同流之间关联的方法。
 
-<a name="usage"></a>
+[时态表函数 Join](#时态表函数-join) 和[时态表 Join](#时态表-join)都有类似的功能,但是有不同的 SQL 语法和 runtime 
实现:
+
+* 时态表函数 Join 的 SQL 语法是一种 Join 用户定义生成表函数(UDTF,User-Defined Table-Generating 
Functions),而时态表 Join 使用了 SQL:2011 标准引入的标准时态表语法。
+* 时态表 Join 的覆盖了时态表函数 Join 支持的所有功能,两者共享部分算子实现,基于事件时间的时态表 Join 从 Flink 1.12 开始支持。
+* 时态表函数 Join 总是保留数据流的状态,但在一些情况下,时态表 Join 可以不用保留流的状态,即:基于处理时间的时态表 Join 中, join 
算子可以在运行时根据记录的键值查找外部数据库而不是从状态中获取。
+* 时态表函数 Join 在 legacy planer 和 Blink planer 中均支持,而时态表 Join 仅在 Blink planner 
中支持,legacy planner 在将来会被废弃。。
+
+**注意**: 基于处理时间的时态 Join 中, 如果右侧表不是可以直接查询外部系统的表而是普通的数据流,时态表函数 Join 和 时态表 Join 
的语义都有问题,时态表函数 Join 仍然允许使用,但是时态表 Join 禁用了该功能。
+语义问题的原因是 join 算子没办法知道右侧时态表(构建侧)的完整快照是否到齐,这可能导致左侧的流在启动时关联不到用户期待的数据, 
在生产环境中可能误导用户。
+
+Flink SQL 在未来可能需要引入新的机制去获取右侧时态表的完整快照。
 
 ### 用法
 
@@ -183,166 +376,4 @@ val result = orders
 {% endhighlight %}
 </div>
 </div>
-
-**注意**: 时态 Join 中的 State 保留(在[查询配置]({%link 
dev/table/streaming/query_configuration.zh.md 
%})中定义)还未实现。这意味着计算的查询结果所需的状态可能会无限增长,具体数量取决于历史记录表的不重复主键个数。
-
-<a name="processing-time-temporal-joins"></a>
-
-### 基于处理时间的时态 Join
-
-如果将处理时间作为时间属性,将无法将 _过去_ 时间属性作为参数传递给时态表函数。
-根据定义,处理时间总会是当前时间戳。因此,基于处理时间的时态表函数将始终返回基础表的最新已知版本,时态表函数的调用将始终返回基础表的最新已知版本,并且基础历史表中的任何更新也将立即覆盖当前值。
-
-只有最新版本的构建侧记录(是否最新由所定义的主键所决定)会被保存在 state 中。
-构建侧的更新不会对之前 Join 的结果产生影响。
-
-可以将处理时间的时态 Join 视作简单的 `HashMap <K,V>`,HashMap 中存储来自构建侧的所有记录。
-当来自构建侧的新插入的记录与旧值具有相同的 Key 时,旧值会被覆盖。
-探针侧的每条记录将总会根据 `HashMap` 的最新/当前状态来计算。
-
-<a name="event-time-temporal-joins"></a>
-
-### 基于事件时间的时态 Join
-
-将事件时间作为时间属性时,可将 _过去_ 时间属性作为参数传递给时态表函数。这允许对两个表中在相同时间点的记录执行 Join 操作。
-
-与基于处理时间的时态 Join 相比,时态表不仅将构建侧记录的最新版本(是否最新由所定义的主键所决定)保存在 state 中,同时也会存储自上一个 
watermarks 以来的所有版本(按时间区分)。
-
-例如,在探针侧表新插入一条事件时间时间为 `12:30:00` 的记录,它将和构建侧表时间点为 `12:30:00` 
的版本根据[时态表的概念]({%link dev/table/streaming/temporal_tables.zh.md %})进行 Join 运算。
-因此,新插入的记录仅与时间戳小于等于 `12:30:00` 的记录进行 Join 计算(由主键决定哪些时间点的数据将参与计算)。
-
-通过定义事件时间,[watermarks]({%link dev/event_time.zh.md %}) 允许 Join 
运算不断向前滚动,丢弃不再需要的构建侧快照。因为不再需要时间戳更低或相等的记录。
-
-<a name="join-with-a-temporal-table"></a>
-
-时态表 Join
---------------------------
-
-时态表 Join 意味着对任意表(左输入/探针侧)和一个时态表(右输入/构建侧)执行的 Join 
操作,即随时间变化的的扩展表。请参考相应的页面以获取更多有关[时态表]({%link 
dev/table/streaming/temporal_tables.zh.md %}#temporal-table)的信息。
-
-<span class="label label-danger">注意</span> 不是任何表都能用作时态表,能作为时态表的表必须实现接口 
`LookupableTableSource`。接口 `LookupableTableSource` 的实例只能作为时态表用于时态 Join 
。查看此页面获取更多关于[如何实现接口 `LookupableTableSource`]({%link dev/table/sourceSinks.zh.md 
%}#defining-a-tablesource-for-lookups) 的详细内容。
-
-接下来的示例展示了订单流 `Orders` 该如何与实时变化的汇率表 `Lates` 进行 Join 操作。
-
-`LatestRates` 是实时更新的汇率表。在时间点 `10:15`, `10:30`, `10:52` 中,表 `LatestRates` 
中的内容分别如下:
-
-{% highlight sql %}
-10:15> SELECT * FROM LatestRates;
-
-currency   rate
-======== ======
-US Dollar   102
-Euro        114
-Yen           1
-
-10:30> SELECT * FROM LatestRates;
-
-currency   rate
-======== ======
-US Dollar   102
-Euro        114
-Yen           1
-
-
-10:52> SELECT * FROM LatestRates;
-
-currency   rate
-======== ======
-US Dollar   102
-Euro        116     <==== changed from 114 to 116
-Yen           1
-{% endhighlight %}
-
-表 `LastestRates` 中的数据在时间点 `10:15` 和 `10:30` 时是相等的。欧元汇率在时间点 `10:52` 从 114 变化至 
116 。
-
-表 `Orders` 包含了金额字段 `amount` 和货币字段 `currency` 的支付记录数据。例如在 `10:15` 有一笔金额为 `2` 
欧元的订单记录。
-
-{% highlight sql %}
-SELECT * FROM Orders;
-
-amount currency
-====== =========
-     2 Euro             <== arrived at time 10:15
-     1 US Dollar        <== arrived at time 10:30
-     2 Euro             <== arrived at time 10:52
-{% endhighlight %}
-
-基于以上,我们想要计算所有 `Orders` 表的订单金额总和,并同时转换为对应成日元的金额。
-
-例如,我们想要以表 `LatestRates` 中的汇率将以下订单转换,则结果将为:
-
-{% highlight text %}
-amount currency     rate   amout*rate
-====== ========= ======= ============
-     2 Euro          114          228    <== arrived at time 10:15
-     1 US Dollar     102          102    <== arrived at time 10:30
-     2 Euro          116          232    <== arrived at time 10:52
-{% endhighlight %}
-
-
-通过时态表 Join,我们可以将上述操作表示为以下 SQL 查询:
-
-{% highlight sql %}
-SELECT
-  o.amout, o.currency, r.rate, o.amount * r.rate
-FROM
-  Orders AS o
-  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
-  ON r.currency = o.currency
-{% endhighlight %}
-
-探针侧表中的每个记录都将与构建侧表的当前版本所关联。 在此示例中,查询使用`处理时间`作为处理时间,因而新增订单将始终与表 `LatestRates` 
的最新汇率执行 Join 操作。 注意,结果对于处理时间来说不是确定的。
-
-与[常规 Join](#regular-joins) 相比,尽管构建侧表的数据发生了变化,但时态表 Join 的变化前结果不会随之变化。而且时态表 Join 
运算非常轻量级且不会保留任何状态。
-
-与[时间区间 Join](#interval-joins) 相比,时态表 Join 没有定义决定哪些记录将被 Join 的时间窗口。
-探针侧的记录将总是与构建侧在对应`处理时间`的最新数据执行 Join。因而构建侧的数据可能是任意旧的。
-
-[时态表函数 Join](#join-with-a-temporal-table-function) 和时态表 Join 都有类似的功能,但是有不同的 
SQL 语法和 runtime 实现:
-
-* 时态表函数 Join 的 SQL 语法是一种 Join 用户定义生成表函数(UDTF,User-Defined Table-Generating 
Functions),而时态表 Join 使用了 SQL:2011 标准引入的标准时态表语法。
-* 时态表函数 Join 的实现实际上是 Join 两个流并保存在 state 中,而时态表 Join 只接受唯一的输入流,并根据记录的键值查找外部数据库。
-* 时态表函数 Join 通常用于与变更日志流执行 Join,而时态表 Join 通常与外部表(例如维度表)执行 Join 操作。
-
-这种做法让时态表 Join 成为一个很好的用于表达不同流之间关联的方法。
-
-将来,时态表 Join 将支持时态表函数 Join 的功能,即支持时态 Join 变更日志流。
-
-<a name="usage-1"></a>
-
-### 用法
-
-时态表 Join 的语法如下:
-
-{% highlight sql %}
-SELECT [column_list]
-FROM table1 [AS <alias1>]
-[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
-ON table1.column-name1 = table2.column-name1
-{% endhighlight %}
-
-目前只支持 INNER JOIN 和 LEFT JOIN,`FOR SYSTEM_TIME AS OF table1.proctime` 应位于时态表之后. 
`proctime` 是 `table1` 的[处理时间属性]({%link 
dev/table/streaming/time_attributes.zh.md %}#processing-time)。
-这意味着在 Join 计算中连接左侧表中的每个记录时会为时态表产生快照。
-
-例如在[定义时态表]({%link dev/table/streaming/temporal_tables.zh.md 
%}#defining-temporal-table)之后,我们可以用以下方式使用:
-
-<div class="codetabs" markdown="1">
-<div data-lang="SQL" markdown="1">
-{% highlight sql %}
-SELECT
-  SUM(o_amount * r_rate) AS amount
-FROM
-  Orders
-  JOIN LatestRates FOR SYSTEM_TIME AS OF o_proctime
-  ON r_currency = o_currency
-{% endhighlight %}
-</div>
-</div>
-
-<span class="label label-danger">注意</span> 只在 Blink planner 中支持。
-
-<span class="label label-danger">注意</span> 只在 SQL 中支持,Table API 暂不支持。
-
-<span class="label label-danger">注意</span> Flink 目前不支持事件时间时态表的 Join 。
-
 {% top %}
diff --git a/docs/dev/table/streaming/temporal_tables.md 
b/docs/dev/table/streaming/temporal_tables.md
index e9fd57d..8625013 100644
--- a/docs/dev/table/streaming/temporal_tables.md
+++ b/docs/dev/table/streaming/temporal_tables.md
@@ -20,15 +20,18 @@ software distributed under the License is distributed on an
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
--->
+--> 
 
-Temporal Tables represent a concept of a (parameterized) view on a changing 
table that returns the content of a table at a specific point in time.
+A Temporal table is a table that evolves over time -  otherwise known in Flink 
as a [dynamic table](dynamic_tables.html). Rows in a temporal table are 
associated with one or more temporal periods and all Flink tables are 
temporal(dynamic).
 
-The changing table can either be a changing history table which tracks the 
changes (e.g. database changelogs) or a changing dimension table which 
materializes the changes (e.g. database tables).
+A temporal table contains one or more versioned table snapshots, it can be a 
changing history table which tracks the changes(e.g. database changelog, 
contains all snapshots) or a changing dimensioned table which materializes the 
changes(e.g. database table, contains the latest snapshot). 
 
-For the changing history table, Flink can keep track of the changes and allows 
for accessing the content of the table at a certain point in time within a 
query. In Flink, this kind of table is represented by a *Temporal Table 
Function*.
+**Version**: A temporal table can split into a set of versioned table 
snapshots, the version in table snapshots represents the valid life circle of 
rows, the start time and the end time of the valid period can be assigned by 
users. 
+Temporal table can split to `versioned table` and `regular table` according to 
the table can tracks its history version or not.
 
-For the changing dimension table, Flink allows for accessing the content of 
the table at processing time within a query. In Flink, this kind of table is 
represented by a *Temporal Table*.
+**Versioned table**: If the rows a in temporal table can track its history 
changes and visit its history versions, we call this a versioned table. Tables 
that comes from a database changelog can be defined as a versioned table.
+
+**Regular table**: If the row in temporal table can only track and visit its 
latest version,we call this kind of temporal table as regular table. Tables 
that comes from a database or HBase can be defined as a regular table.
 
 * This will be replaced by the TOC
 {:toc}
@@ -36,87 +39,236 @@ For the changing dimension table, Flink allows for 
accessing the content of the
 Motivation
 ----------
 
-### Correlate with a changing history table
+### Correlate with a versioned table
+Given a scenario the order stream correlates the dimension table product, the 
table `orders` comes from kafka which contains the real time orders, the table 
`product_changelog` comes from the changelog of the database table `products`,
+ the product price in table `products` is changing over time. 
 
-Let's assume that we have the following table `RatesHistory`.
+{% highlight sql %}
+SELECT * FROM product_changelog;
+
+(changelog kind)  update_time  product_id product_name price
+================= ===========  ========== ============ ===== 
++(INSERT)         00:01:00     p_001      scooter      11.11
++(INSERT)         00:02:00     p_002      basketball   23.11
+-(UPDATE_BEFORE)  12:00:00     p_001      scooter      11.11
++(UPDATE_AFTER)   12:00:00     p_001      scooter      12.99
+-(UPDATE_BEFORE)  12:00:00     p_002      basketball   23.11 
++(UPDATE_AFTER)   12:00:00     p_002      basketball   19.99
+-(DELETE)         18:00:00     p_001      scooter      12.99 
+{% endhighlight %}
 
+The table `product_changelog` represents an ever growing changelog of database 
table `products`,  for example, the initial price of product `scooter` is 
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
+ the product item is deleted from the table `products` at `18:00:00`.
+
+Given that we would like to output the version of `product_changelog` table of 
the time `10:00:00`, the following table shows the result. 
 {% highlight sql %}
-SELECT * FROM RatesHistory;
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+00:01:00     p_001      scooter      11.11
+00:02:00     p_002      basketball   23.11
+{% endhighlight %}
 
-rowtime currency   rate
-======= ======== ======
-09:00   US Dollar   102
-09:00   Euro        114
-09:00   Yen           1
-10:45   Euro        116
-11:15   Euro        119
-11:49   Pounds      108
+Given that we would like to output the version of `product_changelog` table of 
the time `13:00:00`, the following table shows the result. 
+{% highlight sql %}
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+12:00:00     p_001      scooter      12.99
+12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-`RatesHistory` represents an ever growing append-only table of currency 
exchange rates with respect to `Yen` (which has a rate of `1`).
-For example, the exchange rate for the period from `09:00` to `10:45` of 
`Euro` to `Yen` was `114`. From `10:45` to `11:15` it was `116`.
+In above example, the specific version of the table is tracked by 
`update_time` and `product_id`,  the `product_id` would be a primary key for 
`product_changelog` table and `update_time` would be the event time.
+
+In Flink, this is represented by a [*versioned 
table*](#defining-versioned-table).
+
+### Correlate with a regular table
 
-Given that we would like to output all current rates at the time `10:58`, we 
would need the following SQL query to compute a result table:
+On the other hand, some use cases require to join a regular table which is an 
external database table.
 
+Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is 
materialized with the latest rates. The `LatestRates` always represents the 
latest content of hbase table `rates`.
+ 
+Then the content of `LatestRates` table when we query at time `10:15:00` is:
 {% highlight sql %}
-SELECT *
-FROM RatesHistory AS r
-WHERE r.rowtime = (
-  SELECT MAX(rowtime)
-  FROM RatesHistory AS r2
-  WHERE r2.currency = r.currency
-  AND r2.rowtime <= TIME '10:58');
+10:15:00 > SELECT * FROM LatestRates;
+
+currency  rate
+========= ====
+US Dollar 102
+Euro      114
+Yen       1
+{% endhighlight %}
+
+Then the content of `LatestRates` table when we query at time `11:00:00` is:
+{% highlight sql %}
+11:00:00 > SELECT * FROM LatestRates;
+
+currency  rate
+========= ====
+US Dollar 102
+Euro      116
+Yen       1
 {% endhighlight %}
 
-The correlated subquery determines the maximum time for the corresponding 
currency that is lower or equal than the desired time. The outer query lists 
the rates that have a maximum timestamp.
+In Flink, this is represented by a [*regular Table*](#defining-regular-table).
+
+Temporal Table
+--------------
+<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
 
-The following table shows the result of such a computation. In our example, 
the update to `Euro` at `10:45` is taken into account, however, the update to 
`Euro` at `11:15` and the new entry of `Pounds` are not considered in the 
table's version at time `10:58`.
+Flink uses primary key constraint and event time to define both versioned 
table and versioned view.
 
-{% highlight text %}
-rowtime currency   rate
-======= ======== ======
-09:00   US Dollar   102
-09:00   Yen           1
-10:45   Euro        116
+### Defining Versioned Table
+The table is a versioned table in Flink only is the table contains primary key 
constraint and event time.
+{% highlight sql %}
+-- Define a versioned table
+CREATE TABLE product_changelog (
+  product_id STRING,
+  product_name STRING,
+  product_price DECIMAL(10, 4),
+  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
+  PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) defines the primary key 
constraint
+  WATERMARK FOR update_time AS update_time   -- (2) defines the event time by 
watermark                               
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'products',
+  'scan.startup.mode' = 'earliest-offset',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'value.format' = 'debezium-json'
+);
 {% endhighlight %}
 
-The concept of *Temporal Tables* aims to simplify such queries, speed up their 
execution, and reduce Flink's state usage. A *Temporal Table* is a 
parameterized view on an append-only table that interprets the rows of the 
append-only table as the changelog of a table and provides the version of that 
table at a specific point in time. Interpreting the append-only table as a 
changelog requires the specification of a primary key attribute and a timestamp 
attribute. The primary key determines [...]
+Line `(1)` defines the primary key constraint for table `product_changelog`, 
Line `(2)` defines the `update_time` as event time for table 
`product_changelog`,
+thus table `product_changelog` is a versioned table.
 
-In the above example `currency` would be a primary key for `RatesHistory` 
table and `rowtime` would be the timestamp attribute.
+**Note**: The grammar `METADATA FROM 'value.source.timestamp' VIRTUAL` means 
extract the database
+operation execution time for every changelog, it's strongly recommended 
defines the database operation execution time 
+as event time rather than ingestion-time or time in the record, otherwise the 
version extract from the changelog may
+mismatch with the version in database.
+ 
+### Defining Versioned View
 
-In Flink, this is represented by a [*Temporal Table 
Function*](#temporal-table-function).
+Flink also supports defining versioned view only if the view contains unique 
key constraint and event time.
+ 
+Let’s assume that we have the following table `RatesHistory`:
+{% highlight sql %}
+-- Define an append-only table
+CREATE TABLE RatesHistory (
+    currency_time TIMESTAMP(3),
+    currency STRING,
+    rate DECIMAL(38, 10),
+    WATERMARK FOR currency_time AS currency_time   -- defines the event time
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'rates',
+  'scan.startup.mode' = 'earliest-offset',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'format' = 'json'                                -- this is an append only 
source
+)
+{% endhighlight %}
 
-### Correlate with a changing dimension table
+Table `RatesHistory` represents an ever growing append-only table of currency 
exchange rates with respect to 
+Yen (which has a rate of 1). For example, the exchange rate for the period 
from 09:00 to 10:45 of Euro to Yen was 114.
+From 10:45 to 11:15 it was 116.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
 
-On the other hand, some use cases require to join a changing dimension table 
which is an external database table.
+currency_time currency  rate
+============= ========= ====
+09:00:00      US Dollar 102
+09:00:00      Euro      114
+09:00:00      Yen       1
+10:45:00      Euro      116
+11:15:00      Euro      119
+11:49:00      Pounds    108
+{% endhighlight %}
 
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is 
materialized with the latest rate. The `LatestRates` is the materialized 
history `RatesHistory`. Then the content of `LatestRates` table at time `10:58` 
will be:
+To define a versioned table on `RatesHistory`, Flink supports defining a 
versioned view 
+by [deduplication query]({{ site.baseurl 
}}/dev/table/sql/queries.html#deduplication) which produces an ordered changelog
+stream with an inferred primary key(`currency`) and event 
time(`currency_time`).
 
-{% highlight text %}
-10:58> SELECT * FROM LatestRates;
-currency   rate
-======== ======
-US Dollar   102
-Yen           1
-Euro        116
+{% highlight sql %}
+-- Define a versioned view
+CREATE VIEW versioned_rates AS              
+SELECT currency, rate, currency_time            -- (1) `currency_time` keeps 
the event time
+  FROM (
+      SELECT *,
+      ROW_NUMBER() OVER (PARTITION BY currency  -- (2) the inferred unique key 
`currency` can be a primary key
+         ORDER BY currency_time DESC) AS rowNum 
+      FROM RatesHistory)
+WHERE rowNum = 1; 
+
+-- the view `versioned_rates` will produce changelog as the following.
+(changelog kind) currency_time currency   rate
+================ ============= =========  ====
++(INSERT)        09:00:00      US Dollar  102
++(INSERT)        09:00:00      Euro       114
++(INSERT)        09:00:00      Yen        1
++(UPDATE_AFTER)  10:45:00      Euro       116
++(UPDATE_AFTER)  11:15:00      Euro       119
++(INSERT)        11:49:00      Pounds     108
+{% endhighlight sql %}
+
+Line `(1)` defines the `currency_time` as event time for view 
`versioned_rates`, Line `(2)` defines the primary key constraint
+for view `versioned_rates`, thus view `versioned_rates` is a versioned view.
+
+The deduplication query in view will be optimized in Flink and produce 
changelog effectively, the produced changelog keep primary key and event times.
+
+Given that we would like to output the version of `versioned_rates` view of 
the time `11:00:00`, the following table shows the result: 
+{% highlight sql %}
+currency_time currency   rate  
+============= ========== ====
+09:00:00      US Dollar  102
+09:00:00      Yen        1
+10:45:00      Euro       116
 {% endhighlight %}
 
-The content of `LatestRates` table at time `12:00` will be:
+Given that we would like to output the version of `versioned_rates` view of 
the time `12:00:00`, the following table shows the result: 
+{% highlight sql %}
+currency_time currency   rate  
+============= ========== ====
+09:00:00      US Dollar  102
+09:00:00      Yen        1
+10:45:00      Euro       119
+11:49:00      Pounds     108
+{% endhighlight %}
 
-{% highlight text %}
-12:00> SELECT * FROM LatestRates;
-currency   rate
-======== ======
-US Dollar   102
-Yen           1
-Euro        119
-Pounds      108
+### Defining Regular Table
+ 
+Regular table definition is same with Flink table DDL, see also the page about 
[create table]({{ site.baseurl }}/dev/table/sql/create.html#create-table) for 
more information about how to create a regular table.
+ 
+{% highlight sql %}
+-- Define an HBase table with DDL, then we can use it as a temporal table in 
sql
+-- Column 'currency' is the rowKey in HBase table
+ CREATE TABLE LatestRates (   
+     currency STRING,   
+     fam1 ROW<rate DOUBLE>   
+ ) WITH (   
+    'connector' = 'hbase-1.4',   
+    'table-name' = 'rates',   
+    'zookeeper.quorum' = 'localhost:2181'   
+ );
 {% endhighlight %}
 
-In Flink, this is represented by a [*Temporal Table*](#temporal-table).
+<span class="label label-danger">Attention</span>
+Arbitrary table can use as temporal table in processing-time temporal join 
theoretically, but currently the supported tale is the 
+table backed by a `LookupableTableSource`. A `LookupableTableSource` can only 
be used for processing-time temporal join as a temporal table. 
+
+The table defines with `LookupableTableSource` means the table must has lookup 
ability to look up an external storage system 
+by one or more keys during runtime. The current supported regular table in 
processing-time temporal join includes 
+[JDBC]({{ site.baseurl }}/dev/table/connectors/jdbc.html), [HBase]({{ 
site.baseurl }}/dev/table/connectors/hbase.html) 
+and [Hive]({{ site.baseurl 
}}/dev/table/hive/hive_streaming.html#hive-table-as-temporal-tables).
+
+See also the page about [how to define 
LookupableTableSource](../sourceSinks.html#lookup-table-source).
+
+Using arbitrary table as temporal table in processing time temporal table join 
will be supported in the near future. 
 
 Temporal Table Function
 ------------------------
+The temporal table function is a legacy way to define ad temporal table and 
access the temporal table content. In order to access the data in a temporal 
table,  now we can use
+DDL to define a temporal table.
+
+The main difference between Temporal Table DDL and Temporal Table Function are 
the temporal table DDL can directly use in pure SQL but temporal table function 
can not; the temporal table DDL support defines versioned table from changelog 
stream and append-only stream but temporal table function only supports 
append-only stream.
+
 
 In order to access the data in a temporal table, one must pass a [time 
attribute](time_attributes.html) that determines the version of the table that 
will be returned.
 Flink uses the SQL syntax of [table functions]({{ site.baseurl 
}}/dev/table/functions/udfs.html#table-functions) to provide a way to express 
it.
@@ -127,33 +279,32 @@ This set contains the latest versions of the rows for all 
of the existing primar
 Assuming that we defined a temporal table function `Rates(timeAttribute)` 
based on `RatesHistory` table, we could query such a function in the following 
way:
 
 {% highlight sql %}
-SELECT * FROM Rates('10:15');
+SELECT * FROM Rates('10:15:00');
 
-rowtime currency   rate
-======= ======== ======
-09:00   US Dollar   102
-09:00   Euro        114
-09:00   Yen           1
+rowtime  currency  rate
+=======  ========= ====
+09:00:00 US Dollar 102
+09:00:00 Euro      114
+09:00:00 Yen       1
 
-SELECT * FROM Rates('11:00');
+SELECT * FROM Rates('11:00:00');
 
-rowtime currency   rate
-======= ======== ======
-09:00   US Dollar   102
-10:45   Euro        116
-09:00   Yen           1
+rowtime  currency  rate
+======== ========= ====
+09:00:00 US Dollar 102
+10:45:00 Euro      116
+09:00:00 Yen       1
 {% endhighlight %}
 
 Each query to `Rates(timeAttribute)` would return the state of the `Rates` for 
the given `timeAttribute`.
 
-**Note**: Currently, Flink doesn't support directly querying the temporal 
table functions with a constant time attribute parameter. At the moment, 
temporal table functions can only be used in joins.
-The example above was used to provide an intuition about what the function 
`Rates(timeAttribute)` returns.
+**Note**: Currently, Flink doesn't support directly querying the temporal 
table functions with a constant time attribute parameter. The above example was 
used to provide an intuition about what the function `Rates(timeAttribute)` 
returns.
 
 See also the page about [joins for continuous queries](joins.html) for more 
information about how to join with a temporal table.
 
 ### Defining Temporal Table Function
 
-The following code snippet illustrates how to create a temporal table function 
from an append-only table.
+The following code snippet illustrates how to create a temporal table by 
temporal table function from an append-only table.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -216,93 +367,10 @@ tEnv.registerFunction("Rates", rates)                     
                     /
 </div>
 </div>
 
-Line `(1)` creates a `rates` [temporal table 
function](#temporal-table-functions),
+Line `(1)` creates a `rates` [temporal table 
function](#temporal-table-function),
 which allows us to use the function `rates` in the [Table 
API](../tableApi.html#joins).
 
 Line `(2)` registers this function under the name `Rates` in our table 
environment,
 which allows us to use the `Rates` function in [SQL]({{ site.baseurl 
}}/dev/table/sql/queries.html#joins).
 
-## Temporal Table
-
-<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
-
-In order to access data in temporal table, currently one must define a 
`TableSource` with `LookupableTableSource`. Flink uses the SQL syntax of `FOR 
SYSTEM_TIME AS OF` to query temporal table, which is proposed in SQL:2011.
-
-Assuming that we defined a temporal table called `LatestRates`, we can query 
such a table in the following way:
-
-{% highlight sql %}
-SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15';
-
-currency   rate
-======== ======
-US Dollar   102
-Euro        114
-Yen           1
-
-SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00';
-
-currency   rate
-======== ======
-US Dollar   102
-Euro        116
-Yen           1
-{% endhighlight %}
-
-**Note**: Currently, Flink doesn't support directly querying the temporal 
table with a constant time. At the moment, temporal table can only be used in 
joins. The example above is used to provide an intuition about what the 
temporal table `LatestRates` returns.
-
-See also the page about [joins for continuous queries](joins.html) for more 
information about how to join with a temporal table.
-
-### Defining Temporal Table
-
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-// Get the stream and table environments.
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
-StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
-// or TableEnvironment tEnv = TableEnvironment.create(settings);
-
-// Define an HBase table with DDL, then we can use it as a temporal table in 
sql
-// Column 'currency' is the rowKey in HBase table
-tEnv.executeSql(
-    "CREATE TABLE LatestRates (" +
-    "   currency STRING," +
-    "   fam1 ROW<rate DOUBLE>" +
-    ") WITH (" +
-    "   'connector' = 'hbase-1.4'," +
-    "   'table-name' = 'Rates'," +
-    "   'zookeeper.quorum' = 'localhost:2181'" +
-    ")");
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-// Get the stream and table environments.
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-val settings = EnvironmentSettings.newInstance().build()
-val tEnv = StreamTableEnvironment.create(env, settings)
-// or val tEnv = TableEnvironment.create(settings)
-
-// Define an HBase table with DDL, then we can use it as a temporal table in 
sql
-// Column 'currency' is the rowKey in HBase table
-tEnv.executeSql(
-    s"""
-       |CREATE TABLE LatestRates (
-       |    currency STRING,
-       |    fam1 ROW<rate DOUBLE>
-       |) WITH (
-       |    'connector' = 'hbase-1.4',
-       |    'table-name' = 'Rates',
-       |    'zookeeper.quorum' = 'localhost:2181'
-       |)
-       |""".stripMargin)
-
-{% endhighlight %}
-</div>
-</div>
-
-See also the page about [how to define 
LookupableTableSource](../sourceSinks.html#defining-a-tablesource-for-lookups).
-
 {% top %}
diff --git a/docs/dev/table/streaming/temporal_tables.zh.md 
b/docs/dev/table/streaming/temporal_tables.zh.md
index df929d9..d7be911 100644
--- a/docs/dev/table/streaming/temporal_tables.zh.md
+++ b/docs/dev/table/streaming/temporal_tables.zh.md
@@ -22,13 +22,15 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-时态表(Temporal Table)代表基于表的(参数化)视图概念,该表记录变更历史,该视图返回表在某个特定时间点的内容。
+时态表(Temporal Table)是一张随时间变化的表 -- 在 Flink 
中称为[动态表](dynamic_tables.html),时态表中的每条记录都关联了一个或多个时间段,所有的 Flink 表都是时态的(动态的)。
 
-变更表可以是跟踪变化的历史记录表(例如数据库变更日志),也可以是有具体更改的维表(例如数据库表)。
+时态表包含表的一个或多个有版本的表快照,时态表可以是一张跟踪所有变更记录的表(例如数据库表的 
changelog,包含多个表快照),也可以是物化所有变更之后的表(例如数据库表,只有最新表快照)。
 
-对于记录变更历史的表,Flink 可以追踪这些变化,并且允许查询这张表在某个特定时间点的内容。在 Flink 中,这类表由*时态表函数(Temporal 
Table Function)*表示。
+**版本**: 
时态表可以划分成一系列带版本的表快照集合,表快照中的版本代表了快照中所有记录的有效区间,有效区间的开始时间和结束时间可以通过用户指定,根据时态表是否可以追踪自身的历史版本与否,时态表可以分为
 `版本表` 和 `普通表`。 
 
-对于变化的维表,Flink 允许查询这张表在处理时的内容,在 Flink 中,此类表由*时态表(Temporal Table)*表示。
+**版本表**: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。
+ 
+**普通表**: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。
 
 * This will be replaced by the TOC
 {:toc}
@@ -36,91 +38,228 @@ under the License.
 设计初衷
 ----------
 
-### 与记录变更历史的表相关
+### 关联一张版本表
 
-假设我们有表 `RatesHistory` 如下所示。
+以订单流关联产品表这个场景举例,`orders` 表包含了来自 Kafka 的实时订单流,`product_changelog` 表来自数据库表 
`products` 的 changelog , 产品的价格在数据库表 `products` 中是随时间实时变化的。
 
 {% highlight sql %}
-SELECT * FROM RatesHistory;
+SELECT * FROM product_changelog;
+
+(changelog kind)  update_time  product_id product_name price
+================= ===========  ========== ============ ===== 
++(INSERT)         00:01:00     p_001      scooter      11.11
++(INSERT)         00:02:00     p_002      basketball   23.11
+-(UPDATE_BEFORE)  12:00:00     p_001      scooter      11.11
++(UPDATE_AFTER)   12:00:00     p_001      scooter      12.99
+-(UPDATE_BEFORE)  12:00:00     p_002      basketball   23.11 
++(UPDATE_AFTER)   12:00:00     p_002      basketball   19.99
+-(DELETE)         18:00:00     p_001      scooter      12.99 
+{% endhighlight %}
 
-rowtime currency   rate
-======= ======== ======
-09:00   US Dollar   102
-09:00   Euro        114
-09:00   Yen           1
-10:45   Euro        116
-11:15   Euro        119
-11:49   Pounds      108
+表 `product_changelog` 表示数据库表 `products`不断增长的 changelog, 比如,产品 `scooter` 在时间点 
`00:01:00`的初始价格是 `11.11`, 在 `12:00:00` 的时候涨价到了 `12.99`, 
+在 `18:00:00` 的时候这条产品价格记录被删除。
+ 
+如果我们想输出 `product_changelog` 表在 `10:00:00` 对应的版本,表的内容如下所示:
+{% highlight sql %}
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+00:01:00     p_001      scooter      11.11
+00:02:00     p_002      basketball   23.11
 {% endhighlight %}
 
-`RatesHistory` 代表一个兑换日元货币汇率表(日元汇率为1),该表是不断增长的 append-only 表。例如,`欧元`兑`日元`从 
`09:00` 到 `10:45` 的汇率为 `114`。从 `10:45` 到 `11:15`,汇率为 `116`。
+如果我们想输出 `product_changelog` 表在 `13:00:00` 对应的版本,表的内容如下所示:
+{% highlight sql %}
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+12:00:00     p_001      scooter      12.99
+12:00:00     p_002      basketball   19.99
+{% endhighlight %}
+
+上述例子中,`products` 表的版本是通过 `update_time` 和 `product_id` 进行追踪的,`product_id` 对应 
`product_changelog` 表的主键,`update_time` 对应事件时间。
+
+在 Flink 中, 这由[*版本表*](#声明版本表)表示。
+
+### 关联一张普通表
+
+另一方面,某些用户案列需要连接变化的维表,该表是外部数据库表。
 
-假设我们要输出 `10:58` 的所有当前汇率,则需要以下 SQL 查询来计算结果表:
+假设 `LatestRates` 是一个物化的最新汇率表 (比如:一张 HBase 表),`LatestRates` 总是表示 HBase 表 
`Rates` 的最新内容。
 
+我们在 `10:15:00` 时查询到的内容如下所示:
 {% highlight sql %}
-SELECT *
-FROM RatesHistory AS r
-WHERE r.rowtime = (
-  SELECT MAX(rowtime)
-  FROM RatesHistory AS r2
-  WHERE r2.currency = r.currency
-  AND r2.rowtime <= TIME '10:58');
+10:15:00 > SELECT * FROM LatestRates;
+
+currency  rate
+========= ====
+US Dollar 102
+Euro      114
+Yen       1
 {% endhighlight %}
 
-子查询确定对应货币的最大时间小于或等于所需时间。外部查询列出具有最大时间戳的汇率。 
- 
-下表显示了这种计算的结果。我们的示例中,在 `10:58` 时表的内容,考虑了 `10:45` 时`欧元`的更新,但未考虑 `11:15` 
时的`欧元`更新和`英镑`的新值。
-
-{% highlight text %}
-rowtime currency   rate
-======= ======== ======
-09:00   US Dollar   102
-09:00   Yen           1
-10:45   Euro        116
+我们在 `11:00:00` 时查询到的内容如下所示:
+{% highlight sql %}
+11:00:00 > SELECT * FROM LatestRates;
+
+currency  rate
+========= ====
+US Dollar 102
+Euro      116
+Yen       1
+{% endhighlight %}
+
+在 Flink 中, 这由[*普通表*](#声明普通表)表示。
+
+时态表
+-----
+<span class="label label-danger">注意</span> 仅 Blink planner 支持此功能。
+
+Flink 使用主键约束 和 event time 来 定义一张版本表和版本视图。
+
+### 声明版本表
+在 Flink 中,定义了主键约束和 event time 属性的表就是版本表。
+{% highlight sql %}
+-- 定义一张版本表
+CREATE TABLE product_changelog (
+  product_id STRING,
+  product_name STRING,
+  product_price DECIMAL(10, 4),
+  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
+  PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) 定义主键约束
+  WATERMARK FOR update_time AS update_time   -- (2) 通过 watermark 定义 event time 
                     
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'products',
+  'scan.startup.mode' = 'earliest-offset',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'value.format' = 'debezium-json'
+);
 {% endhighlight %}
 
- *时态表*的概念旨在简化此类查询,加快其执行速度,并减少 Flink 的状态使用。*时态表*是 append-only 表上的参数化视图,该视图将 
append-only 表的行解释为表的变更日志,并在特定时间点提供该表的版本。将 append-only 
表解释为变更日志需要指定主键属性和时间戳属性。主键确定哪些行将被覆盖,时间戳确定行有效的时间。
+行 `(1)` 为表 `product_changelog` 定义了主键, 行 `(2)` 把 `update_time` 定义为表 
`product_changelog` 的 event time,因此 `product_changelog` 是一张版本表。
 
-在上面的示例中,`currency` 是 `RatesHistory` 表的主键,而 `rowtime` 是时间戳属性。
+**注意**: `METADATA FROM 'value.source.timestamp' VIRTUAL` 语法的意思是从每条 changelog 
中抽取 changelog 对应的数据库表中操作的执行时间,强烈推荐使用数据库表中操作的
+执行时间作为 event time ,否则通过时间抽取的版本可能和数据库中的版本不匹配。
+ 
+### 声明版本视图
 
-在 Flink 中,这由[*时态表函数*](#temporal-table-function)表示。
+Flink 也支持定义版本视图只要一个视图包含主键和 event time 便是一个版本视图。
 
-### 与维表变化相关
+假设我们有表 `RatesHistory` 如下所示:
+{% highlight sql %}
+-- 定义一张 append-only 表
+CREATE TABLE RatesHistory (
+    currency_time TIMESTAMP(3),
+    currency STRING,
+    rate DECIMAL(38, 10),
+    WATERMARK FOR currency_time AS currency_time   -- 定义 event time
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'rates',
+  'scan.startup.mode' = 'earliest-offset',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'format' = 'json'                                -- 普通的 append-only 流
+)
+{% endhighlight %}
 
-另一方面,某些用例需要连接变化的维表,该表是外部数据库表。
+表 `RatesHistory` 代表一个兑换日元货币汇率表(日元汇率为1),该表是不断增长的 append-only 表。
+例如,`欧元` 兑换 `日元` 从 `09:00:00` 到 `10:45:00` 的汇率为 `114`。从 `10:45:00` 到 `11:15:00` 
的汇率为 `116`。
 
-假设 `LatestRates` 是一个被物化的最新汇率表。`LatestRates` 是物化的 `RatesHistory` 历史。那么 
`LatestRates` 表在 `10:58` 的内容将是:
+{% highlight sql %}
+SELECT * FROM RatesHistory;
 
-{% highlight text %}
-10:58> SELECT * FROM LatestRates;
-currency   rate
-======== ======
-US Dollar   102
-Yen           1
-Euro        116
+currency_time currency  rate
+============= ========= ====
+09:00:00      US Dollar 102
+09:00:00      Euro      114
+09:00:00      Yen       1
+10:45:00      Euro      116
+11:15:00      Euro      119
+11:49:00      Pounds    108
 {% endhighlight %}
 
-`12:00` 时 `LatestRates` 表的内容将是: 
+为了在 `RatesHistory` 上定义版本表,Flink 支持通过[去重查询]({{ site.baseurl 
}}/zh/dev/table/sql/queries.html#去重)定义版本视图,
+去重查询可以产出一个有序的 changelog 流,去重查询能够推断主键并保留原始数据流的 event time 属性。
 
-{% highlight text %}
-12:00> SELECT * FROM LatestRates;
-currency   rate
-======== ======
-US Dollar   102
-Yen           1
-Euro        119
-Pounds      108
+{% highlight sql %}
+CREATE VIEW versioned_rates AS              
+SELECT currency, rate, currency_time            -- (1) `currency_time` 保留了 
event time
+  FROM (
+      SELECT *,
+      ROW_NUMBER() OVER (PARTITION BY currency  -- (2) `currency` 是去重 query 的 
unique key,可以作为主键
+         ORDER BY currency_time DESC) AS rowNum 
+      FROM RatesHistory )
+WHERE rowNum = 1; 
+
+-- 视图 `versioned_rates` 将会产出如下的 changelog:
+
+(changelog kind) currency_time currency   rate
+================ ============= =========  ====
++(INSERT)        09:00:00      US Dollar  102
++(INSERT)        09:00:00      Euro       114
++(INSERT)        09:00:00      Yen        1
++(UPDATE_AFTER)  10:45:00      Euro       116
++(UPDATE_AFTER)  11:15:00      Euro       119
++(INSERT)        11:49:00      Pounds     108
+{% endhighlight sql %}
+
+行 `(1)` 保留了 event time 作为视图 `versioned_rates` 的 event time,行 `(2)` 使得视图 
`versioned_rates` 有了主键, 因此视图 `versioned_rates` 是一个版本视图。
+
+视图中的去重 query 会被 Flink 优化并高效地产出 changelog stream, 产出的 changelog 保留了主键约束和 event 
time。
+ 
+如果我们想输出 `versioned_rates` 表在 `11:00:00` 对应的版本,表的内容如下所示:
+{% highlight sql %}
+currency_time currency   rate  
+============= ========== ====
+09:00:00      US Dollar  102
+09:00:00      Yen        1
+10:45:00      Euro       116
 {% endhighlight %}
 
-在 Flink 中,这由[*时态表*](#temporal-table)表示。
+如果我们想输出 `versioned_rates` 表在 `12:00:00` 对应的版本,表的内容如下所示:
+{% highlight sql %}
+currency_time currency   rate  
+============= ========== ====
+09:00:00      US Dollar  102
+09:00:00      Yen        1
+10:45:00      Euro       119
+11:49:00      Pounds     108
+{% endhighlight %}
+
+### 声明普通表
+ 
+普通表的声明和 Flink 建表 DDL 一致,参考 [create table]({{ site.baseurl 
}}/zh/dev/table/sql/create.html#create-table) 页面获取更多如何建表的信息。
+ 
+{% highlight sql %}
+// 用 DDL 定义一张 HBase 表,然后我们可以在 SQL 中将其当作一张时态表使用
+// 'currency' 列是 HBase 表中的 rowKey
+ CREATE TABLE LatestRates (   
+     currency STRING,   
+     fam1 ROW<rate DOUBLE>   
+ ) WITH (   
+    'connector' = 'hbase-1.4',   
+    'table-name' = 'rates',   
+    'zookeeper.quorum' = 'localhost:2181'   
+ );
+{% endhighlight %}
+
+<span class="label label-danger">注意</span> 理论上讲任意都能用作时态表并在基于处理时间的时态表 Join 
中使用,但当前支持作为时态表的普通表必须实现接口 `LookupableTableSource`。接口 `LookupableTableSource` 
的实例只能作为时态表用于基于处理时间的时态 Join 。查看此页面获取更多关于[如何实现接口 `LookupableTableSource`]({%link 
dev/table/sourceSinks.zh.md %}#defining-a-tablesource-for-lookups) 的详细内容。
+
+通过 `LookupableTableSource` 定义的表意味着该表具备了在运行时通过一个或多个 key 去查询外部存储系统的能力,当前支持在 
Processing-time 时态表 join 使用的表 包括
+[JDBC]({{ site.baseurl }}/zh/dev/table/connectors/jdbc.html), [HBase]({{ 
site.baseurl }}/zh/dev/table/connectors/hbase.html) 
+and [Hive]({{ site.baseurl 
}}/zh/dev/table/hive/hive_streaming.html#hive-table-as-temporal-tables)。
+
+另请参阅有关[如何定义 LookupableTableSource 
](../sourceSinks.html#defining-a-tablesource-for-lookups)的页面。
 
-<a name="temporal-table-function"></a>
+在基于处理时间的时态表 Join 中支持任意表作为时态表会在不远的将来支持。
 
 时态表函数
 ------------------------
+时态表函数是一种过时的方式去定义时态表并关联时态表的数据,现在我们可以用[时态表 DDL](#时态表-ddl)去定义时态表,用[时态表 
Join](#时态表-join) 语法去关联时态表。 
 
+时态表函数和时态表 DDL 最大的区别在于,时态表 DDL 可以在纯 SQL 环境中使用但是时态表函数不支持,用时态表 DDL 声明的时态表支持 
changelog 流和 append-only 流但时态表函数仅支持 append-only 流。
+ 
 为了访问时态表中的数据,必须传递一个[时间属性](time_attributes.html),该属性确定将要返回的表的版本。
-Flink 使用[表函数]({{ site.baseurl 
}}/zh/dev/table/functions/udfs.html#table-functions)的 SQL 语法提供一种表达它的方法。
+Flink 使用[表函数]({{ site.baseurl }}/zh/dev/table/functions/udfs.html#表值函数)的 SQL 
语法提供一种表达它的方法。
 
 定义后,*时态表函数*将使用单个时间参数 timeAttribute 并返回一个行集合。
 该集合包含相对于给定时间属性的所有现有主键的行的最新版本。
@@ -128,21 +267,21 @@ Flink 使用[表函数]({{ site.baseurl 
}}/zh/dev/table/functions/udfs.html#tabl
 假设我们基于 `RatesHistory` 表定义了一个时态表函数,我们可以通过以下方式查询该函数 `Rates(timeAttribute)`: 
 
 {% highlight sql %}
-SELECT * FROM Rates('10:15');
+SELECT * FROM Rates('10:15:00');
 
-rowtime currency   rate
-======= ======== ======
-09:00   US Dollar   102
-09:00   Euro        114
-09:00   Yen           1
+rowtime  currency  rate
+=======  ========= ====
+09:00:00 US Dollar 102
+09:00:00 Euro      114
+09:00:00 Yen       1
 
-SELECT * FROM Rates('11:00');
+SELECT * FROM Rates('11:00:00');
 
-rowtime currency   rate
-======= ======== ======
-09:00   US Dollar   102
-10:45   Euro        116
-09:00   Yen           1
+rowtime  currency  rate
+======== ========= ====
+09:00:00 US Dollar 102
+10:45:00 Euro      116
+09:00:00 Yen       1
 {% endhighlight %}
 
 对 `Rates(timeAttribute)` 的每个查询都将返回给定 `timeAttribute` 的 `Rates` 状态。
@@ -221,88 +360,4 @@ tEnv.registerFunction("Rates", rates)                      
                    /
 
 行`(2)`在表环境中注册名称为 `Rates` 的函数,这使我们可以在[ SQL ]({{ site.baseurl 
}}/zh/dev/table/sql/queries.html#joins)中使用 `Rates` 函数。
 
-<a name="temporal-table"></a>
-
-## 时态表
-
-<span class="label label-danger">注意</span> 仅 Blink planner 支持此功能。
-
-为了访问时态表中的数据,当前必须使用 `LookupableTableSource` 定义一个 `TableSource`。Flink 使用 
SQL:2011 中提出的 `FOR SYSTEM_TIME AS OF` 的 SQL 语法查询时态表。
-
-假设我们定义了一个时态表 `LatestRates`,我们可以通过以下方式查询此表:
-
-{% highlight sql %}
-SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15';
-
-currency   rate
-======== ======
-US Dollar   102
-Euro        114
-Yen           1
-
-SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00';
-
-currency   rate
-======== ======
-US Dollar   102
-Euro        116
-Yen           1
-{% endhighlight %}
-
-**注意**:当前,Flink 不支持以固定时间直接查询时态表。目前,时态表只能在 join 中使用。上面的示例用于为时态表 `LatestRates` 
返回内容提供直观信息。
-
-另请参阅有关[用于持续查询的 join ](joins.html)页面,以获取有关如何与时态表 join 的更多信息。
-
-### 定义时态表
-
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-// 获取 stream 和 table 环境
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
-StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
-// or TableEnvironment tEnv = TableEnvironment.create(settings);
-
-// 用 DDL 定义一张 HBase 表,然后我们可以在 SQL 中将其当作一张时态表使用
-// 'currency' 列是 HBase 表中的 rowKey
-tEnv.executeSql(
-    "CREATE TABLE LatestRates (" +
-    "   currency STRING," +
-    "   fam1 ROW<rate DOUBLE>" +
-    ") WITH (" +
-    "   'connector' = 'hbase-1.4'," +
-    "   'table-name' = 'Rates'," +
-    "   'zookeeper.quorum' = 'localhost:2181'" +
-    ")");
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-// 获取 stream 和 table 环境
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-val settings = EnvironmentSettings.newInstance().build()
-val tEnv = StreamTableEnvironment.create(env, settings)
-// or val tEnv = TableEnvironment.create(settings)
-
-// 用 DDL 定义一张 HBase 表,然后我们可以在 SQL 中将其当作一张时态表使用
-// 'currency' 列是 HBase 表中的 rowKey
-tEnv.executeSql(
-    s"""
-       |CREATE TABLE LatestRates (
-       |    currency STRING,
-       |    fam1 ROW<rate DOUBLE>
-       |) WITH (
-       |    'connector' = 'hbase-1.4',
-       |    'table-name' = 'Rates',
-       |    'zookeeper.quorum' = 'localhost:2181'
-       |)
-       |""".stripMargin)
-{% endhighlight %}
-</div>
-</div>
-
-另请参阅有关[如何定义 LookupableTableSource 
](../sourceSinks.html#defining-a-tablesource-for-lookups)的页面。
-
 {% top %}

Reply via email to