This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new f66c4c4 [FLINK-22177][docs][table] Add documentation for time
functions and time zone support
f66c4c4 is described below
commit f66c4c448294def6e1e079964a64b64b1b090bcd
Author: Leonard Xu <[email protected]>
AuthorDate: Mon Apr 19 23:23:38 2021 +0800
[FLINK-22177][docs][table] Add documentation for time functions and time
zone support
This closes #15634
---
.../docs/dev/table/concepts/time_attributes.md | 30 +-
.../content.zh/docs/dev/table/concepts/timezone.md | 575 +++++++++++++++++++++
.../docs/dev/table/concepts/time_attributes.md | 27 +-
docs/content/docs/dev/table/concepts/timezone.md | 575 +++++++++++++++++++++
4 files changed, 1201 insertions(+), 6 deletions(-)
diff --git a/docs/content.zh/docs/dev/table/concepts/time_attributes.md
b/docs/content.zh/docs/dev/table/concepts/time_attributes.md
index e527420..aabd41f 100644
--- a/docs/content.zh/docs/dev/table/concepts/time_attributes.md
+++ b/docs/content.zh/docs/dev/table/concepts/time_attributes.md
@@ -28,7 +28,7 @@ under the License.
Flink 可以基于几种不同的 *时间* 概念来处理数据。
-- *处理时间* 指的是执行具体操作时的机器时间(也称作"挂钟时间")
+- *处理时间* 指的是执行具体操作时的机器时间(大家熟知的绝对时间, 例如 Java的 `System.currentTimeMillis()`) )
- *事件时间* 指的是数据本身携带的时间。这个时间是在事件产生时的时间。
- *摄入时间* 指的是数据进入 Flink 的时间;在系统内部,会把它当做事件时间来处理。
@@ -94,7 +94,7 @@
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime) # default
### 在创建表的 DDL 中定义
-处理时间属性可以在创建表的 DDL 中用计算列的方式定义,用 `PROCTIME()` 就可以定义处理时间。关于计算列,更多信息可以参考:[CREATE
TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table)
+处理时间属性可以在创建表的 DDL 中用计算列的方式定义,用 `PROCTIME()` 就可以定义处理时间,函数 `PROCTIME()` 的返回类型是
TIMESTAMP_LTZ 。关于计算列,更多信息可以参考:[CREATE TABLE DDL]({{< ref
"docs/dev/table/sql/create" >}}#create-table)
```sql
@@ -233,6 +233,10 @@ val windowedTable = tEnv
事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个
watermark 生成表达式,同时标记这个已有字段为时间属性字段。更多信息可以参考:[CREATE TABLE DDL]({{< ref
"docs/dev/table/sql/create" >}}#create-table)
+Flink supports defining event time attribute on TIMESTAMP column and
TIMESTAMP_LTZ column.
+If the data source contains timestamp literal, it's recommended to defining
event time attribute on TIMESTAMP column:
+
+Flink 支持和在 TIMESTAMP 列和 TIMESTAMP_LTZ
列上定义事件时间。如果源数据中的时间戳数据表示为年-月-日-时-分-秒,则通常为不带时区信息的字符串值,例如 `2020-04-15
20:13:40.564`,建议将事件时间属性定义在 `TIMESTAMP` 列上:
```sql
CREATE TABLE user_actions (
@@ -251,10 +255,30 @@ GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
```
+源数据中的时间戳数据表示为一个纪元 (epoch) 时间,通常是一个 long 值,例如 `1618989564564`,建议将事件时间属性定义在
`TIMESTAMP_LTZ` 列上:
+ ```sql
+
+CREATE TABLE user_actions (
+ user_name STRING,
+ data STRING,
+ ts BIGINT,
+ time_ltz AS TO_TIMESTAMP_LTZ(time_ltz, 3),
+ -- declare time_ltz as event time attribute and use 5 seconds delayed
watermark strategy
+ WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
+) WITH (
+ ...
+);
+
+SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
+FROM user_actions
+GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
+
+```
### 在 DataStream 到 Table 转换时定义
-事件时间属性可以用 `.rowtime` 后缀在定义 `DataStream` schema 的时候来定义。[时间戳和 watermark]({{< ref
"docs/concepts/time" >}}) 在这之前一定是在 `DataStream` 上已经定义好了。
+事件时间属性可以用 `.rowtime` 后缀在定义 `DataStream` schema 的时候来定义。[时间戳和 watermark]({{< ref
"docs/concepts/time" >}}) 在这之前一定是在 `DataStream` 上已经定义好了。
+在从 DataStream 转换到 Table 时,由于 `DataStream` 没有时区概念,因此 Flink 总是将 `rowtime` 属性解析成
`TIMESTAMP WITHOUT TIME ZONE` 类型,并且将所有事件时间的值都视为 UTC 时区的值。
在从 `DataStream` 到 `Table` 转换时定义事件时间属性有两种方式。取决于用 `.rowtime`
后缀修饰的字段名字是否是已有字段,事件时间字段可以是:
diff --git a/docs/content.zh/docs/dev/table/concepts/timezone.md
b/docs/content.zh/docs/dev/table/concepts/timezone.md
new file mode 100644
index 0000000..84ab158
--- /dev/null
+++ b/docs/content.zh/docs/dev/table/concepts/timezone.md
@@ -0,0 +1,575 @@
+---
+title: "时区"
+weight: 4
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+
+under the License.
+-->
+
+## Overview
+
+Flink provides rich data types for Date and Time, including `DATE`, `TIME`,
`TIMESTAMP`, `TIMESTAMP_LTZ`, `INTERVAL YEAR TO MONTH`, `INTERVAL DAY TO
SECOND` (please see [Date and Time]({{< ref "docs/dev/table/types"
>}}#date-and-time) for detailed information).
+Flink supports setting time zone in session level (please see
[table.local-time-zone]({{< ref
"docs/dev/table/config">}}#table-local-time-zone) for detailed information).
+These timestamp data types and time zone support of Flink make it easy to
process business data across time zones.
+
+## TIMESTAMP vs TIMESTAMP_LTZ
+
+### TIMESTAMP type
+ - `TIMESTAMP(p)` is an abbreviation for `TIMESTAMP(p) WITHOUT TIME ZONE`, the
precision `p` supports range is from 0 to 9, 6 by default.
+ - `TIMESTAMP` describes a timestamp represents year, month, day, hour,
minute, second and fractional seconds.
+ - `TIMESTAMP` can be specified from a string literal, e.g.
+ ```sql
+Flink SQL> SELECT TIMESTAMP '1970-01-01 00:00:04.001';
++-------------------------+
+| 1970-01-01 00:00:04.001 |
++-------------------------+
+```
+
+### TIMESTAMP_LTZ type
+ - `TIMESTAMP_LTZ(p)` is an abbreviation for `TIMESTAMP(p) WITH LOCAL TIME
ZONE`, the precision `p` supports range is from 0 to 9, 6 by default.
+ - `TIMESTAMP_LTZ` describes an absolute time point on the time-line, it
stores a long value representing epoch-milliseconds and an int representing
nanosecond-of-millisecond. The epoch time is measured from the standard Java
epoch of `1970-01-01T00:00:00Z`. Every datum of `TIMESTAMP_LTZ` type is
interpreted in the local time zone configured in the current session for
computation and visualization.
+ - `TIMESTAMP_LTZ` has no literal representation and thus can not specify from
literal, it can derives from a long epoch time(e.g. The long time produced by
Java `System.currentTimeMillis()`)
+
+ ```sql
+Flink SQL> CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3);
+Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SELECT * FROM T1;
++---------------------------+
+| TO_TIMESTAMP_LTZ(4001, 3) |
++---------------------------+
+| 1970-01-01 00:00:04.001 |
++---------------------------+
+
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SELECT * FROM T1;
++---------------------------+
+| TO_TIMESTAMP_LTZ(4001, 3) |
++---------------------------+
+| 1970-01-01 08:00:04.001 |
++---------------------------+
+```
+
+- `TIMESTAMP_LTZ` can be used in cross time zones business because the
absolute time point (e.g. above `4001` milliseconds) describes a same
instantaneous point in different time zones.
+Giving a background that at a same time point, the
`System.currentTimeMillis()` of all machines in the world returns same value
(e.g. the `4001` milliseconds in above example), this is absolute time point
meaning.
+
+## Time Zone Usage
+The local time zone defines current session time zone id. You can config the
time zone in Sql Client or Applications.
+
+{{< tabs "SQL snippets" >}}
+{{< tab "SQL Client" >}}
+```sql
+# set to UTC time zone
+Flink SQL> SET table.local-time-zone=UTC;
+
+# set to Shanghai time zone
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+
+# set to Los_Angeles time zone
+Flink SQL> SET table.local-time-zone=America/Los_Angeles;
+```
+{{< /tab >}}
+{{< tab "Java" >}}
+```java
+ EnvironmentSettings envSetting = EnvironmentSettings.newInstance().build();
+ TableEnvironment tEnv = TableEnvironment.create(envSetting);
+
+ // set to UTC time zone
+ tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
+
+// set to Shanghai time zone
+ tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+
+// set to Los_Angeles time zone
+ tEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles"));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val envSetting = EnvironmentSettings.newInstance.build
+val tEnv = TableEnvironment.create(envSetting)
+
+// set to UTC time zone
+tEnv.getConfig.setLocalTimeZone(ZoneId.of("UTC"))
+
+// set to Shanghai time zone
+tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
+
+// set to Los_Angeles time zone
+tEnv.getConfig.setLocalTimeZone(ZoneId.of("America/Los_Angeles"))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The session time zone is useful in Flink SQL, the main usages are:
+
+### Decide time functions return value
+The following time functions is influenced by the configured time zone.
+* LOCALTIME
+* LOCALTIMESTAMP
+* CURRENT_DATE
+* CURRENT_TIME
+* CURRENT_TIMESTAMP
+* CURRENT_ROW_TIMESTAMP()
+* NOW()
+* PROCTIME()
+
+
+```sql
+Flink SQL> SET sql-client.execution.result-mode=tableau;
+Flink SQL> CREATE VIEW MyView1 AS SELECT LOCALTIME, LOCALTIMESTAMP,
CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(),
PROCTIME();
+Flink SQL> DESC MyView1;
+```
+
+```
++------------------------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras
| watermark |
++------------------------+-----------------------------+-------+-----+--------+-----------+
+| LOCALTIME | TIME(0) | false | |
| |
+| LOCALTIMESTAMP | TIMESTAMP(3) | false | |
| |
+| CURRENT_DATE | DATE | false | |
| |
+| CURRENT_TIME | TIME(0) | false | |
| |
+| CURRENT_TIMESTAMP | TIMESTAMP_LTZ(3) | false | |
| |
+|CURRENT_ROW_TIMESTAMP() | TIMESTAMP_LTZ(3) | false | |
| |
+| NOW() | TIMESTAMP_LTZ(3) | false | |
| |
+| PROCTIME() | TIMESTAMP_LTZ(3) *PROCTIME* | false | |
| |
++------------------------+-----------------------------+-------+-----+--------+-----------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SELECT * FROM MyView1;
+```
+
+```
++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
+| LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME |
CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() |
PROCTIME() |
++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
+| 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 | 15:18:36 |
2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 |
2021-04-15 15:18:36.384 |
++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SELECT * FROM MyView1;
+```
+
+```
++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
+| LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME |
CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() |
PROCTIME() |
++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
+| 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 | 23:18:36 |
2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 |
2021-04-15 23:18:36.384 |
++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
+```
+
+### `TIMESTAMP_LTZ` string representation
+The session timezone is used when represents a `TIMESTAMP_LTZ` value to string
format, i.e print the value, cast the value to `STRING` type, cast the value to
`TIMESTAMP`, cast a `TIMESTAMP` value to `TIMESTAMP_LTZ`:
+```sql
+Flink SQL> CREATE VIEW MyView2 AS SELECT TO_TIMESTAMP_LTZ(4001, 3) AS ltz,
TIMESTAMP '1970-01-01 00:00:01.001' AS ntz;
+Flink SQL> DESC MyView2;
+```
+
+```
++------+------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++------+------------------+-------+-----+--------+-----------+
+| ltz | TIMESTAMP_LTZ(3) | true | | | |
+| ntz | TIMESTAMP(3) | false | | | |
++------+------------------+-------+-----+--------+-----------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SELECT * FROM MyView2;
+```
+
+```
++-------------------------+-------------------------+
+| ltz | ntz |
++-------------------------+-------------------------+
+| 1970-01-01 00:00:04.001 | 1970-01-01 00:00:01.001 |
++-------------------------+-------------------------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SELECT * FROM MyView2;
+```
+
+```
++-------------------------+-------------------------+
+| ltz | ntz |
++-------------------------+-------------------------+
+| 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 |
++-------------------------+-------------------------+
+```
+
+```sql
+Flink SQL> CREATE VIEW MyView3 AS SELECT ltz, CAST(ltz AS TIMESTAMP(3)),
CAST(ltz AS STRING), ntz, CAST(ntz AS TIMESTAMP_LTZ(3)) FROM MyView2;
+```
+
+```
+Flink SQL> DESC MyView3;
++-------------------------------+------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras |
watermark |
++-------------------------------+------------------+-------+-----+--------+-----------+
+| ltz | TIMESTAMP_LTZ(3) | true | | |
|
+| CAST(ltz AS TIMESTAMP(3)) | TIMESTAMP(3) | true | | |
|
+| CAST(ltz AS STRING) | STRING | true | | |
|
+| ntz | TIMESTAMP(3) | false | | |
|
+| CAST(ntz AS TIMESTAMP_LTZ(3)) | TIMESTAMP_LTZ(3) | false | | |
|
++-------------------------------+------------------+-------+-----+--------+-----------+
+```
+
+```sql
+Flink SQL> SELECT * FROM MyView3;
+```
+
+```
++-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+
+| ltz | CAST(ltz AS TIMESTAMP(3)) | CAST(ltz AS
STRING) | ntz | CAST(ntz AS TIMESTAMP_LTZ(3)) |
++-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+
+| 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01
08:00:04.001 | 1970-01-01 00:00:01.001 | 1970-01-01 00:00:01.001 |
++-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+
+```
+
+## Time Attribute and Time Zone
+
+Please see [Time Attribute]({{< ref
"docs/dev/table/concepts/time_attributes">}}#时间属性) for more information about
time attribute.
+
+### Processing Time and Time Zone
+Flink SQL defines process time attribute by function `PROCTIME()`, the
function return type is `TIMESTAMP_LTZ`.
+
+{{< hint info >}}
+Before Flink 1.13, the function return type of `PROCTIME()` is `TIMESTAMP`,
and the return value is the `TIMESTAMP` in UTC time zone,
+e.g. the wall-clock shows `2021-03-01 12:00:00` at Shanghai, however the
`PROCTIME()` displays `2021-03-01 04:00:00` which is wrong.
+Flin 1.13 fixes this issue and uses `TIMESTAMP_LTZ` type as return type of
`PROCTIME()`, users don't need to deal time zone problems anymore.
+{{< /hint >}}
+
+The PROCTIME() always represents your local timestamp value, using
TIMESTAMP_LTZ type can also support DayLight Saving Time well.
+
+```sql
+Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SELECT PROCTIME();
+```
+```
++-------------------------+
+| PROCTIME() |
++-------------------------+
+| 2021-04-15 14:48:31.387 |
++-------------------------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SELECT PROCTIME();
+```
+```
++-------------------------+
+| PROCTIME() |
++-------------------------+
+| 2021-04-15 22:48:31.387 |
++-------------------------+
+```
+
+```sql
+Flink SQL> CREATE TABLE MyTable1 (
+ item STRING,
+ price DOUBLE,
+ proctime as PROCTIME()
+ ) WITH (
+ 'connector' = 'socket',
+ 'hostname' = '127.0.0.1',
+ 'port' = '9999',
+ 'format' = 'csv'
+ );
+
+Flink SQL> CREATE VIEW MyView3 AS
+ SELECT
+ TUMBLE_START(proctime, INTERVAL '10' MINUTES) AS window_start,
+ TUMBLE_END(proctime, INTERVAL '10' MINUTES) AS window_end,
+ TUMBLE_PROCTIME(proctime, INTERVAL '10' MINUTES) as
window_proctime,
+ item,
+ MAX(price) as max_price
+ FROM MyTable1
+ GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTES), item;
+
+Flink SQL> DESC MyView3;
+```
+
+```
++-----------------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras |
watermark |
++-----------------+-----------------------------+-------+-----+--------+-----------+
+| window_start | TIMESTAMP(3) | false | | |
|
+| window_end | TIMESTAMP(3) | false | | |
|
+| window_proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false | | |
|
+| item | STRING | true | | |
|
+| max_price | DOUBLE | true | | |
|
++-----------------+-----------------------------+-------+-----+--------+-----------+
+```
+
+Use the following command to ingest data for `MyTable1` in a terminal:
+
+```
+> nc -lk 9999
+A,1.1
+B,1.2
+A,1.8
+B,2.5
+C,3.8
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SELECT * FROM MyView3;
+```
+
+```
++-------------------------+-------------------------+-------------------------+------+-----------+
+| window_start | window_end | window_procime
| item | max_price |
++-------------------------+-------------------------+-------------------------+------+-----------+
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.005
| A | 1.8 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007
| B | 2.5 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007
| C | 3.8 |
++-------------------------+-------------------------+-------------------------+------+-----------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SELECT * FROM MyView3;
+```
+
+Returns the different window start, window end and window proctime compared to
calculation in UTC timezone.
+```
++-------------------------+-------------------------+-------------------------+------+-----------+
+| window_start | window_end | window_procime
| item | max_price |
++-------------------------+-------------------------+-------------------------+------+-----------+
+| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.005
| A | 1.8 |
+| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007
| B | 2.5 |
+| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007
| C | 3.8 |
++-------------------------+-------------------------+-------------------------+------+-----------+
+```
+
+{{< hint info >}}
+Processing time window is non-deterministic, so each run will get different
windows and different aggregations. The above example is just for explaining
how time zone affects processing time window.
+{{< /hint >}}
+
+### Event Time and Time Zone
+Flink supports defining event time attribute on TIMESTAMP column and
TIMESTAMP_LTZ column.
+
+#### Event Time Attribute on TIMESTAMP
+If the timestamp data in the source is represented as
year-month-day-hour-minute-second, usually a string value without time-zone
information, e.g. `2020-04-15 20:13:40.564`, it's recommended to define the
event time attribute as a `TIMESTAMP` column:
+```sql
+Flink SQL> CREATE TABLE MyTable2 (
+ item STRING,
+ price DOUBLE,
+ ts TIMESTAMP(3), -- TIMESTAMP data type
+ WATERMARK FOR ts AS ts - INTERVAL '10' SECOND
+ ) WITH (
+ 'connector' = 'socket',
+ 'hostname' = '127.0.0.1',
+ 'port' = '9999',
+ 'format' = 'csv'
+ );
+
+Flink SQL> CREATE VIEW MyView4 AS
+ SELECT
+ TUMBLE_START(ts, INTERVAL '10' MINUTES) AS window_start,
+ TUMBLE_END(ts, INTERVAL '10' MINUTES) AS window_end,
+ TUMBLE_ROWTIME(ts, INTERVAL '10' MINUTES) as window_rowtime,
+ item,
+ MAX(price) as max_price
+ FROM MyTable2
+ GROUP BY TUMBLE(ts, INTERVAL '10' MINUTES), item;
+
+Flink SQL> DESC MyView4;
+```
+
+```
++----------------+------------------------+------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++----------------+------------------------+------+-----+--------+-----------+
+| window_start | TIMESTAMP(3) | true | | | |
+| window_end | TIMESTAMP(3) | true | | | |
+| window_rowtime | TIMESTAMP(3) *ROWTIME* | true | | | |
+| item | STRING | true | | | |
+| max_price | DOUBLE | true | | | |
++----------------+------------------------+------+-----+--------+-----------+
+```
+
+Use the following command to ingest data for `MyTable2` in a terminal:
+
+```
+> nc -lk 9999
+A,1.1,2021-04-15 14:01:00
+B,1.2,2021-04-15 14:02:00
+A,1.8,2021-04-15 14:03:00
+B,2.5,2021-04-15 14:04:00
+C,3.8,2021-04-15 14:05:00
+C,3.8,2021-04-15 14:11:00
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SELECT * FROM MyView4;
+```
+
+```
++-------------------------+-------------------------+-------------------------+------+-----------+
+| window_start | window_end | window_rowtime
| item | max_price |
++-------------------------+-------------------------+-------------------------+------+-----------+
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| A | 1.8 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| B | 2.5 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| C | 3.8 |
++-------------------------+-------------------------+-------------------------+------+-----------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SELECT * FROM MyView4;
+```
+
+Returns the same window start, window end and window rowtime compared to
calculation in UTC timezone.
+```
++-------------------------+-------------------------+-------------------------+------+-----------+
+| window_start | window_end | window_rowtime
| item | max_price |
++-------------------------+-------------------------+-------------------------+------+-----------+
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| A | 1.8 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| B | 2.5 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| C | 3.8 |
++-------------------------+-------------------------+-------------------------+------+-----------+
+```
+
+#### Event Time Attribute on TIMESTAMP_LTZ
+If the timestamp data in the source is represented as a epoch time, usually a
long value, e.g. `1618989564564`, it's recommended to define event time
attribute as a `TIMESTAMP_LTZ` column.
+```sql
+Flink SQL> CREATE TABLE MyTable3 (
+ item STRING,
+ price DOUBLE,
+ ts BIGINT, -- long time value in epoch milliseconds
+ ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
+ WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND
+ ) WITH (
+ 'connector' = 'socket',
+ 'hostname' = '127.0.0.1',
+ 'port' = '9999',
+ 'format' = 'csv'
+ );
+
+Flink SQL> CREATE VIEW MyView5 AS
+ SELECT
+ TUMBLE_START(ts_ltz, INTERVAL '10' MINUTES) AS window_start,
+ TUMBLE_END(ts_ltz, INTERVAL '10' MINUTES) AS window_end,
+ TUMBLE_ROWTIME(ts_ltz, INTERVAL '10' MINUTES) as
window_rowtime,
+ item,
+ MAX(price) as max_price
+ FROM MyTable3
+ GROUP BY TUMBLE(ts_ltz, INTERVAL '10' MINUTES), item;
+
+Flink SQL> DESC MyView5;
+```
+
+```
++----------------+----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras |
watermark |
++----------------+----------------------------+-------+-----+--------+-----------+
+| window_start | TIMESTAMP(3) | false | | |
|
+| window_end | TIMESTAMP(3) | false | | |
|
+| window_rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | true | | |
|
+| item | STRING | true | | |
|
+| max_price | DOUBLE | true | | |
|
++----------------+----------------------------+-------+-----+--------+-----------+
+```
+
+The input data of MyTable3 is:
+```
+A,1.1,1618495260000 # The corresponding utc timestamp is 2021-04-15 14:01:00
+B,1.2,1618495320000 # The corresponding utc timestamp is 2021-04-15 14:02:00
+A,1.8,1618495380000 # The corresponding utc timestamp is 2021-04-15 14:03:00
+B,2.5,1618495440000 # The corresponding utc timestamp is 2021-04-15 14:04:00
+C,3.8,1618495500000 # The corresponding utc timestamp is 2021-04-15 14:05:00
+C,3.8,1618495860000 # The corresponding utc timestamp is 2021-04-15 14:11:00
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SELECT * FROM MyView5;
+```
+
+```
++-------------------------+-------------------------+-------------------------+------+-----------+
+| window_start | window_end | window_rowtime
| item | max_price |
++-------------------------+-------------------------+-------------------------+------+-----------+
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| A | 1.8 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| B | 2.5 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| C | 3.8 |
++-------------------------+-------------------------+-------------------------+------+-----------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SELECT * FROM MyView5;
+```
+
+Returns the different window start, window end and window rowtime compared to
calculation in UTC timezone.
+```
++-------------------------+-------------------------+-------------------------+------+-----------+
+| window_start | window_end | window_rowtime
| item | max_price |
++-------------------------+-------------------------+-------------------------+------+-----------+
+| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999
| A | 1.8 |
+| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999
| B | 2.5 |
+| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999
| C | 3.8 |
++-------------------------+-------------------------+-------------------------+------+-----------+
+```
+
+## Daylight Saving Time Support
+Flink SQL supports defining time attributes on TIMESTAMP_LTZ column, base on
this, Flink SQL gracefully uses TIMESTAMP and TIMESTAMP_LTZ type in window
processing to support the Daylight Saving Time.
+
+
+Flink use timestamp literal to split the window and assigns window to data
according to the epoch time of the each row. It means Flink uses `TIMESTAMP`
type for window start and window end (e.g. `TUMBLE_START` and `TUMBLE_END`),
uses `TIMESTAMP_LTZ` for window time attribute (e.g. `TUMBLE_PROCTIME`,
`TUMBLE_ROWTIME`).
+Given a example of tumble window, the DaylightTime in Los_Angele starts at
time 2021-03-14 02:00:00:
+```
+long epoch1 = 1615708800000L; // 2021-03-14 00:00:00
+long epoch2 = 1615712400000L; // 2021-03-14 01:00:00
+long epoch3 = 1615716000000L; // 2021-03-14 03:00:00, skip one hour
(2021-03-14 02:00:00)
+long epoch4 = 1615719600000L; // 2021-03-14 04:00:00
+```
+The tumble window [2021-03-14 00:00:00, 2021-03-14 00:04:00] will collect 3
hours' data in Los_angele time zone, but it collect 4 hours' data in other
non-DST time zones, what user to do is only define time attribute on
TIMESTAMP_LTZ column.
+
+All windows in Flink like Hop window, Session window, Cumulative window follow
this way, and all operations in Flink SQL support TIMESTAMP_LTZ well, thus
Flink gracefully supports the Daylight Saving Time zone.
+
+
+## Difference between Batch and Streaming Mode
+The following time functions:
+* LOCALTIME
+* LOCALTIMESTAMP
+* CURRENT_DATE
+* CURRENT_TIME
+* CURRENT_TIMESTAMP
+* NOW()
+
+Flink evaluates their values according to execution mode. They are evaluated
for each record in streaming mode. But in batch mode, they are evaluated once
as the query starts and uses the same result for every row.
+
+The following time functions are evaluated for each record no matter in batch
or streaming mode:
+
+* CURRENT_ROW_TIMESTAMP()
+* PROCTIME()
+
+{{< top >}}
diff --git a/docs/content/docs/dev/table/concepts/time_attributes.md
b/docs/content/docs/dev/table/concepts/time_attributes.md
index c657a78..55e303a 100644
--- a/docs/content/docs/dev/table/concepts/time_attributes.md
+++ b/docs/content/docs/dev/table/concepts/time_attributes.md
@@ -28,7 +28,7 @@ under the License.
Flink can process data based on different notions of time.
-- *Processing time* refers to the machine's system time (also known as
"wall-clock time") that is executing the respective operation.
+- *Processing time* refers to the machine's system time (also known as epoch
time, e.g. Java's `System.currentTimeMillis()`) that is executing the
respective operation.
- *Event time* refers to the processing of streaming data based on timestamps
that are attached to each row. The timestamps can encode when an event happened.
For more information about time handling in Flink, see the introduction about
[event time and watermarks]({{< ref "docs/concepts/time" >}}).
@@ -59,6 +59,8 @@ Event time attributes can be defined in `CREATE` table DDL or
during DataStream-
The event time attribute is defined using a `WATERMARK` statement in `CREATE`
table DDL. A watermark statement defines a watermark generation expression on
an existing event time field, which marks the event time field as the event
time attribute. Please see [CREATE TABLE DDL]({{< ref
"docs/dev/table/sql/create" >}}#create-table) for more information about
watermark statement and watermark strategies.
+Flink supports defining event time attribute on TIMESTAMP column and
TIMESTAMP_LTZ column.
+If the timestamp data in the source is represented as
year-month-day-hour-minute-second, usually a string value without time-zone
information, e.g. `2020-04-15 20:13:40.564`, it's recommended to define the
event time attribute as a `TIMESTAMP` column::
```sql
CREATE TABLE user_actions (
@@ -77,10 +79,29 @@ GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
```
+If the timestamp data in the source is represented as a epoch time, usually a
long value, e.g. `1618989564564`, it's recommended to define event time
attribute as a `TIMESTAMP_LTZ` column:
+```sql
+
+CREATE TABLE user_actions (
+ user_name STRING,
+ data STRING,
+ ts BIGINT,
+ time_ltz AS TO_TIMESTAMP_LTZ(time_ltz, 3),
+ -- declare time_ltz as event time attribute and use 5 seconds delayed
watermark strategy
+ WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
+) WITH (
+ ...
+);
+
+SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
+FROM user_actions
+GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
+
+```
### During DataStream-to-Table Conversion
-When converting a `DataStream` to a table, an event time attribute can be
defined with the `.rowtime` property during schema definition. [Timestamps and
watermarks]({{< ref "docs/concepts/time" >}}) must have already been assigned
in the `DataStream` being converted.
+When converting a `DataStream` to a table, an event time attribute can be
defined with the `.rowtime` property during schema definition. [Timestamps and
watermarks]({{< ref "docs/concepts/time" >}}) must have already been assigned
in the `DataStream` being converted. During the conversion, Flink always
derives rowtime attribute as TIMESTAMP WITHOUT TIME ZONE, because DataStream
doesn't have time zone notion, and treats all event time values as in UTC.
There are two ways of defining the time attribute when converting a
`DataStream` into a `Table`. Depending on whether the specified `.rowtime`
field name exists in the schema of the `DataStream`, the timestamp is either
(1) appended as a new column, or it
(2) replaces an existing column.
@@ -155,7 +176,7 @@ There are three ways to define a processing time attribute.
### Defining in DDL
-The processing time attribute is defined as a computed column in `CREATE`
table DDL using the system `PROCTIME()` function. Please see [CREATE TABLE
DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) for more information
about computed column.
+The processing time attribute is defined as a computed column in `CREATE`
table DDL using the system `PROCTIME()` function, the function return type is
TIMESTAMP_LTZ. Please see [CREATE TABLE DDL]({{< ref
"docs/dev/table/sql/create" >}}#create-table) for more information about
computed column.
```sql
diff --git a/docs/content/docs/dev/table/concepts/timezone.md
b/docs/content/docs/dev/table/concepts/timezone.md
new file mode 100644
index 0000000..dd5d97c
--- /dev/null
+++ b/docs/content/docs/dev/table/concepts/timezone.md
@@ -0,0 +1,575 @@
+---
+title: "Time Zone"
+weight: 4
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+
+under the License.
+-->
+
+## Overview
+
+Flink provides rich data types for Date and Time, including `DATE`, `TIME`,
`TIMESTAMP`, `TIMESTAMP_LTZ`, `INTERVAL YEAR TO MONTH`, `INTERVAL DAY TO
SECOND` (please see [Date and Time]({{< ref "docs/dev/table/types"
>}}#date-and-time) for detailed information).
+Flink supports setting time zone in session level (please see
[table.local-time-zone]({{< ref
"docs/dev/table/config">}}#table-local-time-zone) for detailed information).
+These timestamp data types and time zone support of Flink make it easy to
process business data across time zones.
+
+## TIMESTAMP vs TIMESTAMP_LTZ
+
+### TIMESTAMP type
+ - `TIMESTAMP(p)` is an abbreviation for `TIMESTAMP(p) WITHOUT TIME ZONE`, the
precision `p` supports range is from 0 to 9, 6 by default.
+ - `TIMESTAMP` describes a timestamp represents year, month, day, hour,
minute, second and fractional seconds.
+ - `TIMESTAMP` can be specified from a string literal, e.g.
+ ```sql
+Flink SQL> SELECT TIMESTAMP '1970-01-01 00:00:04.001';
++-------------------------+
+| 1970-01-01 00:00:04.001 |
++-------------------------+
+```
+
+### TIMESTAMP_LTZ type
+ - `TIMESTAMP_LTZ(p)` is an abbreviation for `TIMESTAMP(p) WITH LOCAL TIME
ZONE`, the precision `p` supports range is from 0 to 9, 6 by default.
+ - `TIMESTAMP_LTZ` describes an absolute time point on the time-line, it
stores a long value representing epoch-milliseconds and an int representing
nanosecond-of-millisecond. The epoch time is measured from the standard Java
epoch of `1970-01-01T00:00:00Z`. Every datum of `TIMESTAMP_LTZ` type is
interpreted in the local time zone configured in the current session for
computation and visualization.
+ - `TIMESTAMP_LTZ` has no literal representation and thus can not specify from
literal, it can derives from a long epoch time(e.g. The long time produced by
Java `System.currentTimeMillis()`)
+
+ ```sql
+Flink SQL> CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3);
+Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SELECT * FROM T1;
++---------------------------+
+| TO_TIMESTAMP_LTZ(4001, 3) |
++---------------------------+
+| 1970-01-01 00:00:04.001 |
++---------------------------+
+
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SELECT * FROM T1;
++---------------------------+
+| TO_TIMESTAMP_LTZ(4001, 3) |
++---------------------------+
+| 1970-01-01 08:00:04.001 |
++---------------------------+
+```
+
+- `TIMESTAMP_LTZ` can be used in cross time zones business because the
absolute time point (e.g. above `4001` milliseconds) describes a same
instantaneous point in different time zones.
+Giving a background that at a same time point, the
`System.currentTimeMillis()` of all machines in the world returns same value
(e.g. the `4001` milliseconds in above example), this is absolute time point
meaning.
+
+## Time Zone Usage
+The local time zone defines current session time zone id. You can config the
time zone in Sql Client or Applications.
+
+{{< tabs "SQL snippets" >}}
+{{< tab "SQL Client" >}}
+```sql
+# set to UTC time zone
+Flink SQL> SET table.local-time-zone=UTC;
+
+# set to Shanghai time zone
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+
+# set to Los_Angeles time zone
+Flink SQL> SET table.local-time-zone=America/Los_Angeles;
+```
+{{< /tab >}}
+{{< tab "Java" >}}
+```java
+ EnvironmentSettings envSetting = EnvironmentSettings.newInstance().build();
+ TableEnvironment tEnv = TableEnvironment.create(envSetting);
+
+ // set to UTC time zone
+ tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
+
+// set to Shanghai time zone
+ tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+
+// set to Los_Angeles time zone
+ tEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles"));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val envSetting = EnvironmentSettings.newInstance.build
+val tEnv = TableEnvironment.create(envSetting)
+
+// set to UTC time zone
+tEnv.getConfig.setLocalTimeZone(ZoneId.of("UTC"))
+
+// set to Shanghai time zone
+tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
+
+// set to Los_Angeles time zone
+tEnv.getConfig.setLocalTimeZone(ZoneId.of("America/Los_Angeles"))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The session time zone is useful in Flink SQL, the main usages are:
+
+### Decide time functions return value
+The following time functions is influenced by the configured time zone.
+* LOCALTIME
+* LOCALTIMESTAMP
+* CURRENT_DATE
+* CURRENT_TIME
+* CURRENT_TIMESTAMP
+* CURRENT_ROW_TIMESTAMP()
+* NOW()
+* PROCTIME()
+
+
+```sql
+Flink SQL> SET sql-client.execution.result-mode=tableau;
+Flink SQL> CREATE VIEW MyView1 AS SELECT LOCALTIME, LOCALTIMESTAMP,
CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(),
PROCTIME();
+Flink SQL> DESC MyView1;
+```
+
+```
++------------------------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras
| watermark |
++------------------------+-----------------------------+-------+-----+--------+-----------+
+| LOCALTIME | TIME(0) | false | |
| |
+| LOCALTIMESTAMP | TIMESTAMP(3) | false | |
| |
+| CURRENT_DATE | DATE | false | |
| |
+| CURRENT_TIME | TIME(0) | false | |
| |
+| CURRENT_TIMESTAMP | TIMESTAMP_LTZ(3) | false | |
| |
+|CURRENT_ROW_TIMESTAMP() | TIMESTAMP_LTZ(3) | false | |
| |
+| NOW() | TIMESTAMP_LTZ(3) | false | |
| |
+| PROCTIME() | TIMESTAMP_LTZ(3) *PROCTIME* | false | |
| |
++------------------------+-----------------------------+-------+-----+--------+-----------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SELECT * FROM MyView1;
+```
+
+```
++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
+| LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME |
CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() |
PROCTIME() |
++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
+| 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 | 15:18:36 |
2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 |
2021-04-15 15:18:36.384 |
++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SELECT * FROM MyView1;
+```
+
+```
++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
+| LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME |
CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() |
PROCTIME() |
++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
+| 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 | 23:18:36 |
2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 |
2021-04-15 23:18:36.384 |
++-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
+```
+
+### `TIMESTAMP_LTZ` string representation
+The session timezone is used when represents a `TIMESTAMP_LTZ` value to string
format, i.e print the value, cast the value to `STRING` type, cast the value to
`TIMESTAMP`, cast a `TIMESTAMP` value to `TIMESTAMP_LTZ`:
+```sql
+Flink SQL> CREATE VIEW MyView2 AS SELECT TO_TIMESTAMP_LTZ(4001, 3) AS ltz,
TIMESTAMP '1970-01-01 00:00:01.001' AS ntz;
+Flink SQL> DESC MyView2;
+```
+
+```
++------+------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++------+------------------+-------+-----+--------+-----------+
+| ltz | TIMESTAMP_LTZ(3) | true | | | |
+| ntz | TIMESTAMP(3) | false | | | |
++------+------------------+-------+-----+--------+-----------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SELECT * FROM MyView2;
+```
+
+```
++-------------------------+-------------------------+
+| ltz | ntz |
++-------------------------+-------------------------+
+| 1970-01-01 00:00:04.001 | 1970-01-01 00:00:01.001 |
++-------------------------+-------------------------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SELECT * FROM MyView2;
+```
+
+```
++-------------------------+-------------------------+
+| ltz | ntz |
++-------------------------+-------------------------+
+| 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 |
++-------------------------+-------------------------+
+```
+
+```sql
+Flink SQL> CREATE VIEW MyView3 AS SELECT ltz, CAST(ltz AS TIMESTAMP(3)),
CAST(ltz AS STRING), ntz, CAST(ntz AS TIMESTAMP_LTZ(3)) FROM MyView2;
+```
+
+```
+Flink SQL> DESC MyView3;
++-------------------------------+------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras |
watermark |
++-------------------------------+------------------+-------+-----+--------+-----------+
+| ltz | TIMESTAMP_LTZ(3) | true | | |
|
+| CAST(ltz AS TIMESTAMP(3)) | TIMESTAMP(3) | true | | |
|
+| CAST(ltz AS STRING) | STRING | true | | |
|
+| ntz | TIMESTAMP(3) | false | | |
|
+| CAST(ntz AS TIMESTAMP_LTZ(3)) | TIMESTAMP_LTZ(3) | false | | |
|
++-------------------------------+------------------+-------+-----+--------+-----------+
+```
+
+```sql
+Flink SQL> SELECT * FROM MyView3;
+```
+
+```
++-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+
+| ltz | CAST(ltz AS TIMESTAMP(3)) | CAST(ltz AS
STRING) | ntz | CAST(ntz AS TIMESTAMP_LTZ(3)) |
++-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+
+| 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01
08:00:04.001 | 1970-01-01 00:00:01.001 | 1970-01-01 00:00:01.001 |
++-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+
+```
+
+## Time Attribute and Time Zone
+
+Please see [Time Attribute]({{< ref
"docs/dev/table/concepts/time_attributes">}}#time-attributes) for more
information about time attribute.
+
+### Processing Time and Time Zone
+Flink SQL defines process time attribute by function `PROCTIME()`, the
function return type is `TIMESTAMP_LTZ`.
+
+{{< hint info >}}
+Before Flink 1.13, the function return type of `PROCTIME()` is `TIMESTAMP`,
and the return value is the `TIMESTAMP` in UTC time zone,
+e.g. the wall-clock shows `2021-03-01 12:00:00` at Shanghai, however the
`PROCTIME()` displays `2021-03-01 04:00:00` which is wrong.
+Flin 1.13 fixes this issue and uses `TIMESTAMP_LTZ` type as return type of
`PROCTIME()`, users don't need to deal time zone problems anymore.
+{{< /hint >}}
+
+The PROCTIME() always represents your local timestamp value, using
TIMESTAMP_LTZ type can also support DayLight Saving Time well.
+
+```sql
+Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SELECT PROCTIME();
+```
+```
++-------------------------+
+| PROCTIME() |
++-------------------------+
+| 2021-04-15 14:48:31.387 |
++-------------------------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SELECT PROCTIME();
+```
+```
++-------------------------+
+| PROCTIME() |
++-------------------------+
+| 2021-04-15 22:48:31.387 |
++-------------------------+
+```
+
+```sql
+Flink SQL> CREATE TABLE MyTable1 (
+ item STRING,
+ price DOUBLE,
+ proctime as PROCTIME()
+ ) WITH (
+ 'connector' = 'socket',
+ 'hostname' = '127.0.0.1',
+ 'port' = '9999',
+ 'format' = 'csv'
+ );
+
+Flink SQL> CREATE VIEW MyView3 AS
+ SELECT
+ TUMBLE_START(proctime, INTERVAL '10' MINUTES) AS window_start,
+ TUMBLE_END(proctime, INTERVAL '10' MINUTES) AS window_end,
+ TUMBLE_PROCTIME(proctime, INTERVAL '10' MINUTES) as
window_proctime,
+ item,
+ MAX(price) as max_price
+ FROM MyTable1
+ GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTES), item;
+
+Flink SQL> DESC MyView3;
+```
+
+```
++-----------------+-----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras |
watermark |
++-----------------+-----------------------------+-------+-----+--------+-----------+
+| window_start | TIMESTAMP(3) | false | | |
|
+| window_end | TIMESTAMP(3) | false | | |
|
+| window_proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false | | |
|
+| item | STRING | true | | |
|
+| max_price | DOUBLE | true | | |
|
++-----------------+-----------------------------+-------+-----+--------+-----------+
+```
+
+Use the following command to ingest data for `MyTable1` in a terminal:
+
+```
+> nc -lk 9999
+A,1.1
+B,1.2
+A,1.8
+B,2.5
+C,3.8
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SELECT * FROM MyView3;
+```
+
+```
++-------------------------+-------------------------+-------------------------+------+-----------+
+| window_start | window_end | window_procime
| item | max_price |
++-------------------------+-------------------------+-------------------------+------+-----------+
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.005
| A | 1.8 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007
| B | 2.5 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007
| C | 3.8 |
++-------------------------+-------------------------+-------------------------+------+-----------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SELECT * FROM MyView3;
+```
+
+Returns the different window start, window end and window proctime compared to
calculation in UTC timezone.
+```
++-------------------------+-------------------------+-------------------------+------+-----------+
+| window_start | window_end | window_procime
| item | max_price |
++-------------------------+-------------------------+-------------------------+------+-----------+
+| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.005
| A | 1.8 |
+| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007
| B | 2.5 |
+| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007
| C | 3.8 |
++-------------------------+-------------------------+-------------------------+------+-----------+
+```
+
+{{< hint info >}}
+Processing time window is non-deterministic, so each run will get different
windows and different aggregations. The above example is just for explaining
how time zone affects processing time window.
+{{< /hint >}}
+
+### Event Time and Time Zone
+Flink supports defining event time attribute on TIMESTAMP column and
TIMESTAMP_LTZ column.
+
+#### Event Time Attribute on TIMESTAMP
+If the timestamp data in the source is represented as
year-month-day-hour-minute-second, usually a string value without time-zone
information, e.g. `2020-04-15 20:13:40.564`, it's recommended to define the
event time attribute as a `TIMESTAMP` column:
+```sql
+Flink SQL> CREATE TABLE MyTable2 (
+ item STRING,
+ price DOUBLE,
+ ts TIMESTAMP(3), -- TIMESTAMP data type
+ WATERMARK FOR ts AS ts - INTERVAL '10' SECOND
+ ) WITH (
+ 'connector' = 'socket',
+ 'hostname' = '127.0.0.1',
+ 'port' = '9999',
+ 'format' = 'csv'
+ );
+
+Flink SQL> CREATE VIEW MyView4 AS
+ SELECT
+ TUMBLE_START(ts, INTERVAL '10' MINUTES) AS window_start,
+ TUMBLE_END(ts, INTERVAL '10' MINUTES) AS window_end,
+ TUMBLE_ROWTIME(ts, INTERVAL '10' MINUTES) as window_rowtime,
+ item,
+ MAX(price) as max_price
+ FROM MyTable2
+ GROUP BY TUMBLE(ts, INTERVAL '10' MINUTES), item;
+
+Flink SQL> DESC MyView4;
+```
+
+```
++----------------+------------------------+------+-----+--------+-----------+
+| name | type | null | key | extras | watermark |
++----------------+------------------------+------+-----+--------+-----------+
+| window_start | TIMESTAMP(3) | true | | | |
+| window_end | TIMESTAMP(3) | true | | | |
+| window_rowtime | TIMESTAMP(3) *ROWTIME* | true | | | |
+| item | STRING | true | | | |
+| max_price | DOUBLE | true | | | |
++----------------+------------------------+------+-----+--------+-----------+
+```
+
+Use the following command to ingest data for `MyTable2` in a terminal:
+
+```
+> nc -lk 9999
+A,1.1,2021-04-15 14:01:00
+B,1.2,2021-04-15 14:02:00
+A,1.8,2021-04-15 14:03:00
+B,2.5,2021-04-15 14:04:00
+C,3.8,2021-04-15 14:05:00
+C,3.8,2021-04-15 14:11:00
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SELECT * FROM MyView4;
+```
+
+```
++-------------------------+-------------------------+-------------------------+------+-----------+
+| window_start | window_end | window_rowtime
| item | max_price |
++-------------------------+-------------------------+-------------------------+------+-----------+
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| A | 1.8 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| B | 2.5 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| C | 3.8 |
++-------------------------+-------------------------+-------------------------+------+-----------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SELECT * FROM MyView4;
+```
+
+Returns the same window start, window end and window rowtime compared to
calculation in UTC timezone.
+```
++-------------------------+-------------------------+-------------------------+------+-----------+
+| window_start | window_end | window_rowtime
| item | max_price |
++-------------------------+-------------------------+-------------------------+------+-----------+
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| A | 1.8 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| B | 2.5 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| C | 3.8 |
++-------------------------+-------------------------+-------------------------+------+-----------+
+```
+
+#### Event Time Attribute on TIMESTAMP_LTZ
+If the timestamp data in the source is represented as a epoch time, usually a
long value, e.g. `1618989564564`, it's recommended to define event time
attribute as a `TIMESTAMP_LTZ` column.
+```sql
+Flink SQL> CREATE TABLE MyTable3 (
+ item STRING,
+ price DOUBLE,
+ ts BIGINT, -- long time value in epoch milliseconds
+ ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
+ WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND
+ ) WITH (
+ 'connector' = 'socket',
+ 'hostname' = '127.0.0.1',
+ 'port' = '9999',
+ 'format' = 'csv'
+ );
+
+Flink SQL> CREATE VIEW MyView5 AS
+ SELECT
+ TUMBLE_START(ts_ltz, INTERVAL '10' MINUTES) AS window_start,
+ TUMBLE_END(ts_ltz, INTERVAL '10' MINUTES) AS window_end,
+ TUMBLE_ROWTIME(ts_ltz, INTERVAL '10' MINUTES) as
window_rowtime,
+ item,
+ MAX(price) as max_price
+ FROM MyTable3
+ GROUP BY TUMBLE(ts_ltz, INTERVAL '10' MINUTES), item;
+
+Flink SQL> DESC MyView5;
+```
+
+```
++----------------+----------------------------+-------+-----+--------+-----------+
+| name | type | null | key | extras |
watermark |
++----------------+----------------------------+-------+-----+--------+-----------+
+| window_start | TIMESTAMP(3) | false | | |
|
+| window_end | TIMESTAMP(3) | false | | |
|
+| window_rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | true | | |
|
+| item | STRING | true | | |
|
+| max_price | DOUBLE | true | | |
|
++----------------+----------------------------+-------+-----+--------+-----------+
+```
+
+The input data of MyTable3 is:
+```
+A,1.1,1618495260000 # The corresponding utc timestamp is 2021-04-15 14:01:00
+B,1.2,1618495320000 # The corresponding utc timestamp is 2021-04-15 14:02:00
+A,1.8,1618495380000 # The corresponding utc timestamp is 2021-04-15 14:03:00
+B,2.5,1618495440000 # The corresponding utc timestamp is 2021-04-15 14:04:00
+C,3.8,1618495500000 # The corresponding utc timestamp is 2021-04-15 14:05:00
+C,3.8,1618495860000 # The corresponding utc timestamp is 2021-04-15 14:11:00
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=UTC;
+Flink SQL> SELECT * FROM MyView5;
+```
+
+```
++-------------------------+-------------------------+-------------------------+------+-----------+
+| window_start | window_end | window_rowtime
| item | max_price |
++-------------------------+-------------------------+-------------------------+------+-----------+
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| A | 1.8 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| B | 2.5 |
+| 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999
| C | 3.8 |
++-------------------------+-------------------------+-------------------------+------+-----------+
+```
+
+```sql
+Flink SQL> SET table.local-time-zone=Asia/Shanghai;
+Flink SQL> SELECT * FROM MyView5;
+```
+
+Returns the different window start, window end and window rowtime compared to
calculation in UTC timezone.
+```
++-------------------------+-------------------------+-------------------------+------+-----------+
+| window_start | window_end | window_rowtime
| item | max_price |
++-------------------------+-------------------------+-------------------------+------+-----------+
+| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999
| A | 1.8 |
+| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999
| B | 2.5 |
+| 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999
| C | 3.8 |
++-------------------------+-------------------------+-------------------------+------+-----------+
+```
+
+## Daylight Saving Time Support
+Flink SQL supports defining time attributes on TIMESTAMP_LTZ column, base on
this, Flink SQL gracefully uses TIMESTAMP and TIMESTAMP_LTZ type in window
processing to support the Daylight Saving Time.
+
+
+Flink use timestamp literal to split the window and assigns window to data
according to the epoch time of the each row. It means Flink uses `TIMESTAMP`
type for window start and window end (e.g. `TUMBLE_START` and `TUMBLE_END`),
uses `TIMESTAMP_LTZ` for window time attribute (e.g. `TUMBLE_PROCTIME`,
`TUMBLE_ROWTIME`).
+Given a example of tumble window, the DaylightTime in Los_Angele starts at
time 2021-03-14 02:00:00:
+```
+long epoch1 = 1615708800000L; // 2021-03-14 00:00:00
+long epoch2 = 1615712400000L; // 2021-03-14 01:00:00
+long epoch3 = 1615716000000L; // 2021-03-14 03:00:00, skip one hour
(2021-03-14 02:00:00)
+long epoch4 = 1615719600000L; // 2021-03-14 04:00:00
+```
+The tumble window [2021-03-14 00:00:00, 2021-03-14 00:04:00] will collect 3
hours' data in Los_angele time zone, but it collect 4 hours' data in other
non-DST time zones, what user to do is only define time attribute on
TIMESTAMP_LTZ column.
+
+All windows in Flink like Hop window, Session window, Cumulative window follow
this way, and all operations in Flink SQL support TIMESTAMP_LTZ well, thus
Flink gracefully supports the Daylight Saving Time zone.
+
+
+## Difference between Batch and Streaming Mode
+The following time functions:
+* LOCALTIME
+* LOCALTIMESTAMP
+* CURRENT_DATE
+* CURRENT_TIME
+* CURRENT_TIMESTAMP
+* NOW()
+
+Flink evaluates their values according to execution mode. They are evaluated
for each record in streaming mode. But in batch mode, they are evaluated once
as the query starts and uses the same result for every row.
+
+The following time functions are evaluated for each record no matter in batch
or streaming mode:
+
+* CURRENT_ROW_TIMESTAMP()
+* PROCTIME()
+
+{{< top >}}