sjwiesman commented on a change in pull request #14152:
URL: https://github.com/apache/flink/pull/14152#discussion_r528878500



##########
File path: docs/dev/table/streaming/joins.md
##########
@@ -65,6 +65,220 @@ WHERE o.id = s.orderId AND
 
 Compared to a regular join operation, this kind of join only supports 
append-only tables with time attributes. Since time attributes are 
quasi-monotonic increasing, Flink can remove old values from its state without 
affecting the correctness of the result.
 
+Temporal Joins
+--------------
+<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.

Review comment:
       ```suggestion
   <span class="label label-danger">Attention</span> This feature is only 
supported in Blink planner.
   ```

##########
File path: docs/dev/table/streaming/joins.md
##########
@@ -65,6 +65,220 @@ WHERE o.id = s.orderId AND
 
 Compared to a regular join operation, this kind of join only supports 
append-only tables with time attributes. Since time attributes are 
quasi-monotonic increasing, Flink can remove old values from its state without 
affecting the correctness of the result.
 
+Temporal Joins
+--------------
+<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+<span class="label label-danger">Attention</span> Attention It is only 
supported in SQL, and not supported in Table API yet.
+
+Temporal join is an arbitrary table (left input/probe side) correlate with the 
versions of temporal table (right input/build side), The temporal table can be 
an external dimension table that changes over time 
+or a changelog that tracks all history changes. Please check the corresponding 
page for more information about [temporal tables](temporal_tables.html).
+
+Flink uses the SQL syntax of `FOR SYSTEM_TIME AS OF` to query temporal table, 
which is proposed in SQL:2011. The syntax of temporal table join is as follows:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM table1 [AS <alias1>]
+[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS 
<alias2>]
+ON table1.column-name1 = table2.column-name1
+{% endhighlight %}
+
+### Processing-time Temporal Joins
+
+Processing-time temporal join is the left input table using it's processing 
time to correlate the latest version of [regular 
table](temporal_tables.html#defining-regular-table) in a temporal join.
+Processing-time temporal join only supports regular table currently, and the 
supported regular table table can only contain append-only stream. 
+
+With a processing-time time attribute, it is impossible to pass _past_ time 
attributes as an argument to the temporal table.

Review comment:
       ```suggestion
   With a processing-time time attribute, it is impossible to use a _past_ time 
attribute as an argument to the temporal table.
   ```

##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -20,103 +20,255 @@ software distributed under the License is distributed on 
an
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
--->
+--> 
 
-Temporal Tables represent a concept of a (parameterized) view on a changing 
table that returns the content of a table at a specific point in time.
+Temporal table is a table that evolves over time as known as Flink [dynamic 
table](dynamic_tables.html), rows in temporal table are associated with one or 
more temporal periods, all Flink tables are temporal(dynamic) table.

Review comment:
       ```suggestion
   A temporal table is a table that evolves over time -  otherwise known in 
Flink as a [dynamic table](dynamic_tables.html). Rows in a temporal table are 
associated with one or more temporal periods and all Flink tables are 
temporal(dynamic).
   ```

##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -20,103 +20,255 @@ software distributed under the License is distributed on 
an
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
--->
+--> 
 
-Temporal Tables represent a concept of a (parameterized) view on a changing 
table that returns the content of a table at a specific point in time.
+Temporal table is a table that evolves over time as known as Flink [dynamic 
table](dynamic_tables.html), rows in temporal table are associated with one or 
more temporal periods, all Flink tables are temporal(dynamic) table.
 
-The changing table can either be a changing history table which tracks the 
changes (e.g. database changelogs) or a changing dimension table which 
materializes the changes (e.g. database tables).
+A temporal table contains one or more versioned table snapshots, it can be a 
changing history table which tracks the changes(e.g. database changelog, 
contains all snapshots) or a changing dimensioned table which materializes the 
changes(e.g. database table, contains the latest snapshot). 
 
-For the changing history table, Flink can keep track of the changes and allows 
for accessing the content of the table at a certain point in time within a 
query. In Flink, this kind of table is represented by a *Temporal Table 
Function*.
+**Version**: A temporal table can split into a set of versioned table 
snapshots, the version in table snapshots represents the valid life circle of 
rows, the start time and the end time of the valid period can be assigned by 
users. 
+Temporal table can split to `versioned table` and `regular table` according to 
the table can tracks its history version or not.
 
-For the changing dimension table, Flink allows for accessing the content of 
the table at processing time within a query. In Flink, this kind of table is 
represented by a *Temporal Table*.
+**Versioned table**: If the row in temporal table can track its history 
changes and visit its history versions, we call this kind of temporal table as 
versioned table, the table comes from database changelog can define as a 
versioned table.
+
+**Regular table**: If the row in temporal table can only track and visit its 
latest version,we call this kind of temporal table as regular table. The table 
comes from database or HBase can define as a regular table.

Review comment:
       ```suggestion
   **Versioned table**: If the rows in a temporal table can track its history 
changes and visit its history versions, we call this a versioned table. Tables 
that comes from a database changelog can be defined as a versioned table.
   
   **Regular table**: If the rows in a temporal table can only track and visit 
its latest version, we call this a regular table. Tables that comes from a 
database or HBase can be defined as a regular table.
   ```

##########
File path: docs/dev/table/streaming/joins.md
##########
@@ -65,6 +65,220 @@ WHERE o.id = s.orderId AND
 
 Compared to a regular join operation, this kind of join only supports 
append-only tables with time attributes. Since time attributes are 
quasi-monotonic increasing, Flink can remove old values from its state without 
affecting the correctness of the result.
 
+Temporal Joins
+--------------
+<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+<span class="label label-danger">Attention</span> Attention It is only 
supported in SQL, and not supported in Table API yet.
+
+Temporal join is an arbitrary table (left input/probe side) correlate with the 
versions of temporal table (right input/build side), The temporal table can be 
an external dimension table that changes over time 
+or a changelog that tracks all history changes. Please check the corresponding 
page for more information about [temporal tables](temporal_tables.html).
+
+Flink uses the SQL syntax of `FOR SYSTEM_TIME AS OF` to query temporal table, 
which is proposed in SQL:2011. The syntax of temporal table join is as follows:

Review comment:
       ```suggestion
   Flink uses the SQL syntax of `FOR SYSTEM_TIME AS OF` to query temporal 
table, which is proposed in SQL:2011 standard. The syntax of a temporal table 
join is as follows:
   ```

##########
File path: docs/dev/table/streaming/joins.md
##########
@@ -65,6 +65,220 @@ WHERE o.id = s.orderId AND
 
 Compared to a regular join operation, this kind of join only supports 
append-only tables with time attributes. Since time attributes are 
quasi-monotonic increasing, Flink can remove old values from its state without 
affecting the correctness of the result.
 
+Temporal Joins
+--------------
+<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+<span class="label label-danger">Attention</span> Attention It is only 
supported in SQL, and not supported in Table API yet.
+
+Temporal join is an arbitrary table (left input/probe side) correlate with the 
versions of temporal table (right input/build side), The temporal table can be 
an external dimension table that changes over time 
+or a changelog that tracks all history changes. Please check the corresponding 
page for more information about [temporal tables](temporal_tables.html).
+
+Flink uses the SQL syntax of `FOR SYSTEM_TIME AS OF` to query temporal table, 
which is proposed in SQL:2011. The syntax of temporal table join is as follows:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM table1 [AS <alias1>]
+[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS 
<alias2>]
+ON table1.column-name1 = table2.column-name1
+{% endhighlight %}
+
+### Processing-time Temporal Joins
+
+Processing-time temporal join is the left input table using it's processing 
time to correlate the latest version of [regular 
table](temporal_tables.html#defining-regular-table) in a temporal join.

Review comment:
       ```suggestion
   Processing-time temporal join uses the left input tables processing time 
attribute to correlate the latest version of a [regular 
table](temporal_tables.html#defining-regular-table).
   ```

##########
File path: docs/dev/table/streaming/joins.md
##########
@@ -65,6 +65,220 @@ WHERE o.id = s.orderId AND
 
 Compared to a regular join operation, this kind of join only supports 
append-only tables with time attributes. Since time attributes are 
quasi-monotonic increasing, Flink can remove old values from its state without 
affecting the correctness of the result.
 
+Temporal Joins
+--------------
+<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+<span class="label label-danger">Attention</span> Attention It is only 
supported in SQL, and not supported in Table API yet.
+
+Temporal join is an arbitrary table (left input/probe side) correlate with the 
versions of temporal table (right input/build side), The temporal table can be 
an external dimension table that changes over time 
+or a changelog that tracks all history changes. Please check the corresponding 
page for more information about [temporal tables](temporal_tables.html).
+
+Flink uses the SQL syntax of `FOR SYSTEM_TIME AS OF` to query temporal table, 
which is proposed in SQL:2011. The syntax of temporal table join is as follows:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM table1 [AS <alias1>]
+[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS 
<alias2>]
+ON table1.column-name1 = table2.column-name1
+{% endhighlight %}
+
+### Processing-time Temporal Joins
+
+Processing-time temporal join is the left input table using it's processing 
time to correlate the latest version of [regular 
table](temporal_tables.html#defining-regular-table) in a temporal join.
+Processing-time temporal join only supports regular table currently, and the 
supported regular table table can only contain append-only stream. 
+
+With a processing-time time attribute, it is impossible to pass _past_ time 
attributes as an argument to the temporal table.
+By definition, it is always the current timestamp. Thus, invocations of 
correlating temporal table will always return the latest known versions of the 
underlying table and any updates in the underlying history table will also 
immediately overwrite the current values.
+
+One can think about a processing-time temporal join as a simple `HashMap<K, 
V>` that stores all of the records from the build side.
+When a new record from the build side has the same key as some previous 
record, the old value is just simply overwritten.
+Every record from the probe side is always evaluated against the most 
recent/current state of the `HashMap`.
+
+The following processing-time temporal table join example shows an append-only 
table `orders` that should be joined with the table `LatestRates`,
+`LatestRates` is a dimension table (e.g. HBase table) that is materialized 
with the latest rate. At time `10:15`, `10:30`, `10:52`, the content of 
`LatestRates` looks as follows:
+
+{% highlight sql %}
+10:15> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+10:30> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+10:52> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        116     <==== changed from 114 to 116
+Yen           1
+{% endhighlight %}
+
+The content of `LastestRates` at time `10:15` and `10:30` are equal. The Euro 
rate has changed from 114 to 116 at `10:52`.
+
+`Orders` is an append-only table that represents payments for the given 
`amount` and the given `currency`. For example at `10:15` there was an order 
for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+amount currency
+====== =========
+     2 Euro             <== arrived at time 10:15
+     1 US Dollar        <== arrived at time 10:30
+     2 Euro             <== arrived at time 10:52
+{% endhighlight %}
+
+Given that we would like to calculate the amount of all `Orders` converted to 
a common currency (`Yen`).
+
+For example, we would like to convert the following orders using the latest 
rate in `LatestRates`. The result would be:
+
+{% highlight text %}
+amount currency     rate   amout*rate
+====== ========= ======= ============
+     2 Euro          114          228    <== arrived at time 10:15
+     1 US Dollar     102          102    <== arrived at time 10:30
+     2 Euro          116          232    <== arrived at time 10:52
+{% endhighlight %}
+
+
+With the help of temporal table join, we can express such a query in SQL as:
+
+{% highlight sql %}
+SELECT
+  o.amout, o.currency, r.rate, o.amount * r.rate
+FROM
+  Orders AS o
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
+  ON r.currency = o.currency
+{% endhighlight %}
+
+Each record from the probe side will be joined with the current version of the 
build side table. In our example, the query is using the processing-time 
notion, so a newly appended order would always be joined with the most recent 
version of `LatestRates` when executing the operation.
+Note that the result is not deterministic for processing-time.
+The processing-time temporal join is usually used to enrich the stream with 
external table (i.e. dimension table).
+
+### Event-time Temporal Joins
+
+Event-time temporal join is the left input table using it's event time to 
correlate the corresponding version of [versioned 
table](temporal_tables.html#defining-versioned-table) in a temporal join.
+Event-time temporal join only supports versioned table, the versioned table 
table need to be a changelog stream, the append-only stream can convert to 
changelog stream in Flink, thus the versioned table can come from an 
append-only stream,

Review comment:
       ```suggestion
   Event-time temporal join only supports versioned tables and the versioned 
tables need to be a changelog stream. However, an append-only stream can be 
converted to a changelog stream in Flink, thus the versioned table can come 
from an append-only stream,
   ```

##########
File path: docs/dev/table/streaming/joins.md
##########
@@ -65,6 +65,220 @@ WHERE o.id = s.orderId AND
 
 Compared to a regular join operation, this kind of join only supports 
append-only tables with time attributes. Since time attributes are 
quasi-monotonic increasing, Flink can remove old values from its state without 
affecting the correctness of the result.
 
+Temporal Joins
+--------------
+<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+<span class="label label-danger">Attention</span> Attention It is only 
supported in SQL, and not supported in Table API yet.
+
+Temporal join is an arbitrary table (left input/probe side) correlate with the 
versions of temporal table (right input/build side), The temporal table can be 
an external dimension table that changes over time 
+or a changelog that tracks all history changes. Please check the corresponding 
page for more information about [temporal tables](temporal_tables.html).
+
+Flink uses the SQL syntax of `FOR SYSTEM_TIME AS OF` to query temporal table, 
which is proposed in SQL:2011. The syntax of temporal table join is as follows:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM table1 [AS <alias1>]
+[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS 
<alias2>]
+ON table1.column-name1 = table2.column-name1
+{% endhighlight %}
+
+### Processing-time Temporal Joins
+
+Processing-time temporal join is the left input table using it's processing 
time to correlate the latest version of [regular 
table](temporal_tables.html#defining-regular-table) in a temporal join.
+Processing-time temporal join only supports regular table currently, and the 
supported regular table table can only contain append-only stream. 
+
+With a processing-time time attribute, it is impossible to pass _past_ time 
attributes as an argument to the temporal table.
+By definition, it is always the current timestamp. Thus, invocations of 
correlating temporal table will always return the latest known versions of the 
underlying table and any updates in the underlying history table will also 
immediately overwrite the current values.
+
+One can think about a processing-time temporal join as a simple `HashMap<K, 
V>` that stores all of the records from the build side.
+When a new record from the build side has the same key as some previous 
record, the old value is just simply overwritten.
+Every record from the probe side is always evaluated against the most 
recent/current state of the `HashMap`.
+
+The following processing-time temporal table join example shows an append-only 
table `orders` that should be joined with the table `LatestRates`,
+`LatestRates` is a dimension table (e.g. HBase table) that is materialized 
with the latest rate. At time `10:15`, `10:30`, `10:52`, the content of 
`LatestRates` looks as follows:
+
+{% highlight sql %}
+10:15> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+10:30> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+10:52> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        116     <==== changed from 114 to 116
+Yen           1
+{% endhighlight %}
+
+The content of `LastestRates` at time `10:15` and `10:30` are equal. The Euro 
rate has changed from 114 to 116 at `10:52`.
+
+`Orders` is an append-only table that represents payments for the given 
`amount` and the given `currency`. For example at `10:15` there was an order 
for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+amount currency
+====== =========
+     2 Euro             <== arrived at time 10:15
+     1 US Dollar        <== arrived at time 10:30
+     2 Euro             <== arrived at time 10:52
+{% endhighlight %}
+
+Given that we would like to calculate the amount of all `Orders` converted to 
a common currency (`Yen`).
+
+For example, we would like to convert the following orders using the latest 
rate in `LatestRates`. The result would be:
+
+{% highlight text %}
+amount currency     rate   amout*rate
+====== ========= ======= ============
+     2 Euro          114          228    <== arrived at time 10:15
+     1 US Dollar     102          102    <== arrived at time 10:30
+     2 Euro          116          232    <== arrived at time 10:52
+{% endhighlight %}
+
+
+With the help of temporal table join, we can express such a query in SQL as:
+
+{% highlight sql %}
+SELECT
+  o.amout, o.currency, r.rate, o.amount * r.rate
+FROM
+  Orders AS o
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
+  ON r.currency = o.currency
+{% endhighlight %}
+
+Each record from the probe side will be joined with the current version of the 
build side table. In our example, the query is using the processing-time 
notion, so a newly appended order would always be joined with the most recent 
version of `LatestRates` when executing the operation.
+Note that the result is not deterministic for processing-time.
+The processing-time temporal join is usually used to enrich the stream with 
external table (i.e. dimension table).
+
+### Event-time Temporal Joins
+
+Event-time temporal join is the left input table using it's event time to 
correlate the corresponding version of [versioned 
table](temporal_tables.html#defining-versioned-table) in a temporal join.
+Event-time temporal join only supports versioned table, the versioned table 
table need to be a changelog stream, the append-only stream can convert to 
changelog stream in Flink, thus the versioned table can come from an 
append-only stream,
+please see [versioned view](temporal_tables.html#defining-versioned-view) for 
more information to know how to define a versioned table from an append-only 
stream.
+
+With an event-time time attribute (i.e., a rowtime attribute), it is possible 
to pass _past_ time attributes to the temporal table.

Review comment:
       ```suggestion
   With an event-time time attribute (i.e., a rowtime attribute), it is 
possible to use a _past_ time attribute with the temporal table.
   ```

##########
File path: docs/dev/table/streaming/joins.md
##########
@@ -65,6 +65,220 @@ WHERE o.id = s.orderId AND
 
 Compared to a regular join operation, this kind of join only supports 
append-only tables with time attributes. Since time attributes are 
quasi-monotonic increasing, Flink can remove old values from its state without 
affecting the correctness of the result.
 
+Temporal Joins
+--------------
+<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+<span class="label label-danger">Attention</span> Attention It is only 
supported in SQL, and not supported in Table API yet.
+
+Temporal join is an arbitrary table (left input/probe side) correlate with the 
versions of temporal table (right input/build side), The temporal table can be 
an external dimension table that changes over time 
+or a changelog that tracks all history changes. Please check the corresponding 
page for more information about [temporal tables](temporal_tables.html).
+
+Flink uses the SQL syntax of `FOR SYSTEM_TIME AS OF` to query temporal table, 
which is proposed in SQL:2011. The syntax of temporal table join is as follows:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM table1 [AS <alias1>]
+[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS 
<alias2>]
+ON table1.column-name1 = table2.column-name1
+{% endhighlight %}
+
+### Processing-time Temporal Joins
+
+Processing-time temporal join is the left input table using it's processing 
time to correlate the latest version of [regular 
table](temporal_tables.html#defining-regular-table) in a temporal join.
+Processing-time temporal join only supports regular table currently, and the 
supported regular table table can only contain append-only stream. 
+
+With a processing-time time attribute, it is impossible to pass _past_ time 
attributes as an argument to the temporal table.
+By definition, it is always the current timestamp. Thus, invocations of 
correlating temporal table will always return the latest known versions of the 
underlying table and any updates in the underlying history table will also 
immediately overwrite the current values.
+
+One can think about a processing-time temporal join as a simple `HashMap<K, 
V>` that stores all of the records from the build side.
+When a new record from the build side has the same key as some previous 
record, the old value is just simply overwritten.
+Every record from the probe side is always evaluated against the most 
recent/current state of the `HashMap`.
+
+The following processing-time temporal table join example shows an append-only 
table `orders` that should be joined with the table `LatestRates`,
+`LatestRates` is a dimension table (e.g. HBase table) that is materialized 
with the latest rate. At time `10:15`, `10:30`, `10:52`, the content of 
`LatestRates` looks as follows:
+
+{% highlight sql %}
+10:15> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+10:30> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        114
+Yen           1
+
+10:52> SELECT * FROM LatestRates;
+
+currency   rate
+======== ======
+US Dollar   102
+Euro        116     <==== changed from 114 to 116
+Yen           1
+{% endhighlight %}
+
+The content of `LastestRates` at time `10:15` and `10:30` are equal. The Euro 
rate has changed from 114 to 116 at `10:52`.
+
+`Orders` is an append-only table that represents payments for the given 
`amount` and the given `currency`. For example at `10:15` there was an order 
for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+amount currency
+====== =========
+     2 Euro             <== arrived at time 10:15
+     1 US Dollar        <== arrived at time 10:30
+     2 Euro             <== arrived at time 10:52
+{% endhighlight %}
+
+Given that we would like to calculate the amount of all `Orders` converted to 
a common currency (`Yen`).
+
+For example, we would like to convert the following orders using the latest 
rate in `LatestRates`. The result would be:
+
+{% highlight text %}
+amount currency     rate   amout*rate
+====== ========= ======= ============
+     2 Euro          114          228    <== arrived at time 10:15
+     1 US Dollar     102          102    <== arrived at time 10:30
+     2 Euro          116          232    <== arrived at time 10:52
+{% endhighlight %}
+
+
+With the help of temporal table join, we can express such a query in SQL as:
+
+{% highlight sql %}
+SELECT
+  o.amout, o.currency, r.rate, o.amount * r.rate
+FROM
+  Orders AS o
+  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
+  ON r.currency = o.currency
+{% endhighlight %}
+
+Each record from the probe side will be joined with the current version of the 
build side table. In our example, the query is using the processing-time 
notion, so a newly appended order would always be joined with the most recent 
version of `LatestRates` when executing the operation.
+Note that the result is not deterministic for processing-time.
+The processing-time temporal join is usually used to enrich the stream with 
external table (i.e. dimension table).
+
+### Event-time Temporal Joins

Review comment:
       Can you please move event time temporal joins above processing time? I 
think this is the more common use case. 

##########
File path: docs/dev/table/streaming/joins.md
##########
@@ -65,6 +65,220 @@ WHERE o.id = s.orderId AND
 
 Compared to a regular join operation, this kind of join only supports 
append-only tables with time attributes. Since time attributes are 
quasi-monotonic increasing, Flink can remove old values from its state without 
affecting the correctness of the result.
 
+Temporal Joins
+--------------
+<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+<span class="label label-danger">Attention</span> Attention It is only 
supported in SQL, and not supported in Table API yet.
+
+Temporal join is an arbitrary table (left input/probe side) correlate with the 
versions of temporal table (right input/build side), The temporal table can be 
an external dimension table that changes over time 
+or a changelog that tracks all history changes. Please check the corresponding 
page for more information about [temporal tables](temporal_tables.html).
+
+Flink uses the SQL syntax of `FOR SYSTEM_TIME AS OF` to query temporal table, 
which is proposed in SQL:2011. The syntax of temporal table join is as follows:
+
+{% highlight sql %}
+SELECT [column_list]
+FROM table1 [AS <alias1>]
+[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS 
<alias2>]
+ON table1.column-name1 = table2.column-name1
+{% endhighlight %}
+
+### Processing-time Temporal Joins
+
+Processing-time temporal join is the left input table using it's processing 
time to correlate the latest version of [regular 
table](temporal_tables.html#defining-regular-table) in a temporal join.
+Processing-time temporal join only supports regular table currently, and the 
supported regular table table can only contain append-only stream. 

Review comment:
       ```suggestion
   Processing-time temporal join only supports regular tables currently, and 
the supported regular table can only contain append-only stream. 
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to