This is an automated email from the ASF dual-hosted git repository.
yuxiqian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 9e075ca6d [FLINK-39682] Fix unable to invoke TO_DATE with TIMESTAMP
types (#4397)
9e075ca6d is described below
commit 9e075ca6db71941b31a10c722c6b8ad69ea9f76d
Author: yuxiqian <[email protected]>
AuthorDate: Mon May 18 21:04:56 2026 +0800
[FLINK-39682] Fix unable to invoke TO_DATE with TIMESTAMP types (#4397)
---
docs/content.zh/docs/core-concept/transform.md | 43 ++--
docs/content/docs/core-concept/transform.md | 3 +-
.../cdc/composer/specs/TransformSpecsITCase.java | 1 +
.../src/test/resources/specs/regression.yaml | 275 +++++++++++++++++++++
.../runtime/functions/impl/TemporalFunctions.java | 22 ++
.../parser/metadata/TransformSqlOperatorTable.java | 4 +-
6 files changed, 325 insertions(+), 23 deletions(-)
diff --git a/docs/content.zh/docs/core-concept/transform.md
b/docs/content.zh/docs/core-concept/transform.md
index b01274ad0..8dc5ceae0 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -172,27 +172,28 @@ Flink CDC 使用 [Calcite](https://calcite.apache.org/)
来解析表达式并且
## 时间函数
-| 函数 | Janino 代码
| 描述
|
-|------------------------------------------------------|------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| LOCALTIME | localtime()
| 返回本地时区的当前 SQL 时间,返回类型为 TIME(0)。
|
-| LOCALTIMESTAMP | localtimestamp()
| 返回本地时区的当前 SQL 时间戳,返回类型为 TIMESTAMP(3)。
|
-| CURRENT_TIME | currentTime()
| 返回本地时区的当前 SQL 时间,是 LOCAL_TIME 的同义词。
|
-| CURRENT_DATE | currentDate()
| 返回本地时区的当前 SQL 日期。
|
-| CURRENT_TIMESTAMP | currentTimestamp()
| 返回本地时区的当前 SQL 时间戳,返回类型为 TIMESTAMP_LTZ(3)。
|
-| NOW() | now()
| 返回本地时区的当前 SQL 时间戳,是 CURRENT_TIMESTAMP 的同义词。
|
-| DATE_FORMAT(timestamp, string) | dateFormat(timestamp,
string) | 将时间戳转换为指定日期格式字符串的值。格式字符串与 Java 的
SimpleDateFormat 兼容。
|
-| DATE_FORMAT(date, string) | dateFormat(date,
string) | 将给定日期转换为指定格式字符串的值。格式字符串与 Java 的
SimpleDateFormat 兼容。
|
-| DATE_FORMAT(time, string) | dateFormat(time,
string) | 将给定时间转换为指定格式字符串的值。格式字符串与 Java 的
SimpleDateFormat 兼容。
|
-| DATE_FORMAT_TZ(timestamp, format, timezone) |
dateFormatTz(timestamp, format, timezone) |
使用给定的模式和指定的时区将时间戳或日期时间值格式化为字符串。timezone 参数可以是时区 ID(例如
'UTC'、'Asia/Shanghai')或偏移量(如 '+08:00')。
|
-| TIMESTAMPADD(timeintervalunit, interval, timepoint) |
timestampadd(timeintervalunit, interval, timepoint) | 返回 timepoint 加上 interval
后的时间戳。时间间隔的单位由第一个参数指定,应为以下值之一:SECOND、MINUTE、HOUR、DAY、MONTH 或 YEAR。
|
-| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) |
timestampDiff(timepointunit, timepoint1, timepoint2) | 返回 timepoint1 和
timepoint2 之间的(有符号)时间单位数。时间间隔的单位由第一个参数指定,应为以下值之一:SECOND、MINUTE、HOUR、DAY、MONTH 或
YEAR。
|
-| TO_DATE(string1[, string2]) | toDate(string1[,
string2]) | 将日期字符串 string1 按格式 string2(默认为
'yyyy-MM-dd')转换为日期。
|
-| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[,
string2]) | 将日期时间字符串 string1 按格式 string2(默认为 'yyyy-MM-dd
HH:mm:ss')转换为时间戳,不带时区。
|
-| TO_TIMESTAMP_LTZ(string1[, string2]) |
toTimestampLtz(string1[, string2]) | 将日期时间字符串 string1 按格式
string2(默认为 'yyyy-MM-dd HH:mm:ss')转换为时间戳,带本地时区。
|
-| FROM_UNIXTIME(numeric[, string]) |
fromUnixtime(NUMERIC[, STRING]) | 返回 numeric
参数的字符串格式表示(默认为 'yyyy-MM-dd HH:mm:ss')。numeric 是表示自 '1970-01-01 00:00:00' UTC
以来秒数的内部时间戳值,例如由 UNIX_TIMESTAMP() 函数产生的值。返回值以会话时区(在 TableConfig 中指定)表示。例如,如果在
UTC 时区,FROM_UNIXTIME(44) 返回 '1970-01-01 00:00:44',但如果在 'Asia/Tokyo' 时区则返回
'1970-01-01 09:00:44'。 |
-| UNIX_TIMESTAMP() | unixTimestamp()
| 获取当前 Unix 时间戳(秒)。此函数是非确定性的,意味着每条记录都会重新计算该值。
|
-| UNIX_TIMESTAMP(string1[, string2]) |
unixTimestamp(STRING1[, STRING2]) | 将日期时间字符串 string1 按格式
string2(如果未指定,默认为 yyyy-MM-dd HH:mm:ss)转换为 Unix
时间戳(秒),使用表配置中指定的时区。<br/>如果日期时间字符串中指定了时区并以 UTC+X 格式(如 "yyyy-MM-dd HH:mm:ss.SSS
X")解析,则此函数将使用日期时间字符串中指定的时区而不是表配置中的时区。如果无法解析日期时间字符串,将返回默认值
Long.MIN_VALUE(-9223372036854775808)。 |
-| DATE_ADD(date, int) | dateAdd(date, int)
| 将 N 天添加到给定日期,返回格式为 'yyyy-MM-dd' 的字符串。
|
+| 函数 | Janino 代码
| 描述
|
+|------------------------------------------------------|------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| LOCALTIME | localtime()
| 返回本地时区的当前 SQL 时间,返回类型为 TIME(0)。
|
+| LOCALTIMESTAMP | localtimestamp()
| 返回本地时区的当前 SQL 时间戳,返回类型为 TIMESTAMP(3)。
|
+| CURRENT_TIME | currentTime()
| 返回本地时区的当前 SQL 时间,是 LOCAL_TIME 的同义词。
|
+| CURRENT_DATE | currentDate()
| 返回本地时区的当前 SQL 日期。
|
+| CURRENT_TIMESTAMP | currentTimestamp()
| 返回本地时区的当前 SQL 时间戳,返回类型为 TIMESTAMP_LTZ(3)。
|
+| NOW() | now()
| 返回本地时区的当前 SQL 时间戳,是 CURRENT_TIMESTAMP 的同义词。
|
+| DATE_FORMAT(timestamp, string) | dateFormat(timestamp,
string) | 将时间戳转换为指定日期格式字符串的值。格式字符串与 Java 的
SimpleDateFormat 兼容。
|
+| DATE_FORMAT(date, string) | dateFormat(date,
string) | 将给定日期转换为指定格式字符串的值。格式字符串与 Java 的
SimpleDateFormat 兼容。
|
+| DATE_FORMAT(time, string) | dateFormat(time,
string) | 将给定时间转换为指定格式字符串的值。格式字符串与 Java 的
SimpleDateFormat 兼容。
|
+| DATE_FORMAT_TZ(timestamp, format, timezone) |
dateFormatTz(timestamp, format, timezone) |
使用给定的模式和指定的时区将时间戳或日期时间值格式化为字符串。timezone 参数可以是时区 ID(例如
'UTC'、'Asia/Shanghai')或偏移量(如 '+08:00')。
|
+| TIMESTAMPADD(timeintervalunit, interval, timepoint) |
timestampadd(timeintervalunit, interval, timepoint) | 返回 timepoint 加上 interval
后的时间戳。时间间隔的单位由第一个参数指定,应为以下值之一:SECOND、MINUTE、HOUR、DAY、MONTH 或 YEAR。
|
+| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) |
timestampDiff(timepointunit, timepoint1, timepoint2) | 返回 timepoint1 和
timepoint2 之间的(有符号)时间单位数。时间间隔的单位由第一个参数指定,应为以下值之一:SECOND、MINUTE、HOUR、DAY、MONTH 或
YEAR。
|
+| TO_DATE(string1[, string2]) | toDate(string1[,
string2]) | 将日期字符串 string1 按格式 string2(默认为
'yyyy-MM-dd')转换为日期。格式参数仅适用于该字符串重载。
|
+| TO_DATE(timestamp) | toDate(timestamp)
| 将 TIMESTAMP、带时区的 TIMESTAMP(TIMESTAMP_TZ)或
TIMESTAMP_LTZ 值按该时间值的日历日期转换为 DATE。该重载不支持格式模式;若要解析带格式的字符串,请使用 `TO_DATE(string1[,
string2])`。
|
+| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[,
string2]) | 将日期时间字符串 string1 按格式 string2(默认为 'yyyy-MM-dd
HH:mm:ss')转换为时间戳,不带时区。
|
+| TO_TIMESTAMP_LTZ(string1[, string2]) |
toTimestampLtz(string1[, string2]) | 将日期时间字符串 string1 按格式
string2(默认为 'yyyy-MM-dd HH:mm:ss')转换为时间戳,带本地时区。
|
+| FROM_UNIXTIME(numeric[, string]) |
fromUnixtime(NUMERIC[, STRING]) | 返回 numeric
参数的字符串格式表示(默认为 'yyyy-MM-dd HH:mm:ss')。numeric 是表示自 '1970-01-01 00:00:00' UTC
以来秒数的内部时间戳值,例如由 UNIX_TIMESTAMP() 函数产生的值。返回值以会话时区(在 TableConfig 中指定)表示。例如,如果在
UTC 时区,FROM_UNIXTIME(44) 返回 '1970-01-01 00:00:44',但如果在 'Asia/Tokyo' 时区则返回
'1970-01-01 09:00:44'。 |
+| UNIX_TIMESTAMP() | unixTimestamp()
| 获取当前 Unix 时间戳(秒)。此函数是非确定性的,意味着每条记录都会重新计算该值。
|
+| UNIX_TIMESTAMP(string1[, string2]) |
unixTimestamp(STRING1[, STRING2]) | 将日期时间字符串 string1 按格式
string2(如果未指定,默认为 yyyy-MM-dd HH:mm:ss)转换为 Unix
时间戳(秒),使用表配置中指定的时区。<br/>如果日期时间字符串中指定了时区并以 UTC+X 格式(如 "yyyy-MM-dd HH:mm:ss.SSS
X")解析,则此函数将使用日期时间字符串中指定的时区而不是表配置中的时区。如果无法解析日期时间字符串,将返回默认值
Long.MIN_VALUE(-9223372036854775808)。 |
+| DATE_ADD(date, int) | dateAdd(date, int)
| 将 N 天添加到给定日期,返回格式为 'yyyy-MM-dd' 的字符串。
|
## 条件函数
diff --git a/docs/content/docs/core-concept/transform.md
b/docs/content/docs/core-concept/transform.md
index 5df6dbcf9..4a6d40b61 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -187,7 +187,8 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to
parse expressions and [
| DATE_FORMAT_TZ(timestamp, format, timezone) |
dateFormatTz(timestamp, format, timezone) | Formats a timestamp or
datetime value as a string using the given pattern and the specified time zone.
The timezone argument can be a time zone ID (for example, 'UTC',
'Asia/Shanghai') or an offset such as '+08:00'.
[...]
| TIMESTAMPADD(timeintervalunit, interval, timepoint) |
timestampadd(timeintervalunit, interval, timepoint) | Returns the timestamp of
timepoint2 after timepoint added interval. The unit for the interval is given
by the first argument, which should be one of the following values: SECOND,
MINUTE, HOUR, DAY, MONTH, or YEAR.
[...]
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) |
timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed)
number of timepointunit between timepoint1 and timepoint2. The unit for the
interval is given by the first argument, which should be one of the following
values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.
[...]
-| TO_DATE(string1[, string2]) | toDate(string1[,
string2]) | Converts a date string string1 with
format string2 (by default 'yyyy-MM-dd') to a date.
[...]
+| TO_DATE(string1[, string2]) | toDate(string1[,
string2]) | Converts a date string string1 with
format string2 (by default 'yyyy-MM-dd') to a date. The format argument applies
only to this string overload.
[...]
+| TO_DATE(timestamp) | toDate(timestamp)
| Converts a TIMESTAMP, TIMESTAMP WITH TIME ZONE
(TIMESTAMP_TZ), or TIMESTAMP_LTZ value to a DATE using that value's calendar
date. A format pattern is not supported for this overload; use
`TO_DATE(string1[, string2])` to parse formatted strings.
[...]
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[,
string2]) | Converts date time string string1 with format
string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone.
[...]
| TO_TIMESTAMP_LTZ(string1[, string2]) |
toTimestampLtz(string1[, string2]) | Converts date time
string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a
timestamp, with local time zone.
[...]
| FROM_UNIXTIME(numeric[, string]) |
fromUnixtime(NUMERIC[, STRING]) | Returns a representation
of the numeric argument as a value in string format (default is ‘yyyy-MM-dd
HH:mm:ss’). numeric is an internal timestamp value representing seconds since
‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function.
The return value is expressed in the session time zone (specified in
TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01 [...]
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java
index 21e7ddc5d..0d8079e71 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java
@@ -511,6 +511,7 @@ class TransformSpecsITCase {
"specs/logical.yaml",
"specs/meta.yaml",
"specs/nested.yaml",
+ "specs/regression.yaml",
"specs/string.yaml",
"specs/temporal.yaml"
};
diff --git a/flink-cdc-composer/src/test/resources/specs/regression.yaml
b/flink-cdc-composer/src/test/resources/specs/regression.yaml
new file mode 100644
index 000000000..3df082eeb
--- /dev/null
+++ b/flink-cdc-composer/src/test/resources/specs/regression.yaml
@@ -0,0 +1,275 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+- do: FLINK-39682 (Invoke TO_DATE with TIMESTAMP)
+ time-zone: Asia/Tokyo
+ projection: |-
+ id_
+ timestamp_0_
+ TO_DATE(timestamp_0_) AS comp_1
+ TIMESTAMPADD(HOUR, 18, timestamp_0_) AS comp_2
+ TO_DATE(TIMESTAMPADD(HOUR, 18, timestamp_0_)) AS comp_3
+ timestamp_6_
+ TO_DATE(timestamp_6_) AS comp_4
+ TIMESTAMPADD(HOUR, 18, timestamp_6_) AS comp_5
+ TO_DATE(TIMESTAMPADD(HOUR, 18, timestamp_6_)) AS comp_6
+ timestamp_9_
+ TO_DATE(timestamp_9_) AS comp_7
+ TIMESTAMPADD(HOUR, 18, timestamp_9_) AS comp_8
+ TO_DATE(TIMESTAMPADD(HOUR, 18, timestamp_9_)) AS comp_9
+ filter: id_ > 0
+ primary-key: id_
+ # timestamp_0_ is: 1970-01-02 10:17:36, TO_DATE should yield 1970-01-02
+ # after +18 hours: 1970-01-03 04:17:36, TO_DATE should yield 1970-01-03
+ #
+ # timestamp_6_ is: 1970-01-03 17:09:27, TO_DATE should yield 1970-01-03
+ # after +18 hours: 1970-01-04 11:09:27, TO_DATE should yield 1970-01-04
+ #
+ # timestamp_9_ is: 1970-01-05 00:01:18, TO_DATE should yield 1970-01-05
+ # after +18 hours: 1970-01-05 18:01:18, TO_DATE should yield 1970-01-05
+ #
+ # None of them are affected by pipeline time-zone config.
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`timestamp_0_` TIMESTAMP(0),`comp_1` DATE,`comp_2`
TIMESTAMP(0),`comp_3` DATE,`timestamp_6_` TIMESTAMP(6),`comp_4` DATE,`comp_5`
TIMESTAMP(3),`comp_6` DATE,`timestamp_9_` TIMESTAMP(9),`comp_7` DATE,`comp_8`
TIMESTAMP(3),`comp_9` DATE}, primaryKeys=id_, options=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1,
1970-01-02T10:17:36.789123456, 1970-01-02, 1970-01-03T04:17:36.789, 1970-01-03,
1970-01-03T17:09:27.891234561, 1970-01-03, 1970-01-04T11:09:27.891, 1970-01-04,
1970-01-05T00:01:18.912345612, 1970-01-05, 1970-01-05T18:01:18.912,
1970-01-05], op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1,
1970-01-02T10:17:36.789123456, 1970-01-02, 1970-01-03T04:17:36.789, 1970-01-03,
1970-01-03T17:09:27.891234561, 1970-01-03, 1970-01-04T11:09:27.891, 1970-01-04,
1970-01-05T00:01:18.912345612, 1970-01-05, 1970-01-05T18:01:18.912,
1970-01-05], after=[], op=DELETE, meta=()}
+
+- do: FLINK-39682 (Invoke TO_DATE with TIMESTAMP_TZ)
+ time-zone: America/Los_Angeles
+ projection: |-
+ id_
+ timestamp_tz_0_
+ TO_DATE(timestamp_tz_0_) AS comp_1
+ timestamp_tz_6_
+ TO_DATE(timestamp_tz_6_) AS comp_2
+ timestamp_tz_9_
+ TO_DATE(timestamp_tz_9_) AS comp_3
+ filter: id_ > 0
+ non-null: 'true'
+ primary-key: id_
+ # timestamp_tz_0_ is: 1970-01-02 10:17:36.789123456 (Asia/Shanghai, UTC+8),
TO_DATE should yield 1970-01-02
+ # timestamp_tz_6_ is: 1970-01-03 17:09:27.891234561 (Europe/Berlin, UTC+1),
TO_DATE should yield 1970-01-03
+ # timestamp_tz_9_ is: 1970-01-05 00:01:18.912345612 (America/Puerto_Rico,
UTC-4), TO_DATE should yield 1970-01-05
+ # None of them are affected by pipeline time-zone config.
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`timestamp_tz_0_` TIMESTAMP(0) WITH TIME ZONE,`comp_1`
DATE,`timestamp_tz_6_` TIMESTAMP(6) WITH TIME ZONE,`comp_2`
DATE,`timestamp_tz_9_` TIMESTAMP(9) WITH TIME ZONE,`comp_3` DATE},
primaryKeys=id_, options=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1,
1970-01-02T10:17:36.789123456+08:00, 1970-01-02,
1970-01-03T17:09:27.891234561+01:00, 1970-01-03,
1970-01-05T00:01:18.912345612-04:00, 1970-01-05], op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1,
1970-01-02T10:17:36.789123456+08:00, 1970-01-02,
1970-01-03T17:09:27.891234561+01:00, 1970-01-03,
1970-01-05T00:01:18.912345612-04:00, 1970-01-05], after=[], op=DELETE, meta=()}
+
+- do: FLINK-39682 (Invoke TO_DATE with TIMESTAMP_LTZ)
+ time-zone: Australia/Sydney
+ projection: |-
+ id_
+ timestamp_ltz_0_
+ TO_DATE(timestamp_ltz_0_) AS comp_1
+ TIMESTAMPADD(HOUR, 18, timestamp_ltz_0_) AS comp_2
+ TO_DATE(TIMESTAMPADD(HOUR, 18, timestamp_ltz_0_)) AS comp_3
+ timestamp_ltz_6_
+ TO_DATE(timestamp_ltz_6_) AS comp_4
+ TIMESTAMPADD(HOUR, 18, timestamp_ltz_6_) AS comp_5
+ TO_DATE(TIMESTAMPADD(HOUR, 18, timestamp_ltz_6_)) AS comp_6
+ timestamp_ltz_9_
+ TO_DATE(timestamp_ltz_9_) AS comp_7
+ TIMESTAMPADD(HOUR, 18, timestamp_ltz_9_) AS comp_8
+ TO_DATE(TIMESTAMPADD(HOUR, 18, timestamp_ltz_9_)) AS comp_9
+ filter: id_ > 0
+ primary-key: id_
+ # timestamp_0_ is: 1970-01-02 10:17:36 (UTC), TO_DATE should yield 1970-01-02
+ # after +18 hours: 1970-01-03 04:17:36 (UTC), TO_DATE should yield 1970-01-03
+ #
+ # timestamp_6_ is: 1970-01-03 17:09:27 (UTC), TO_DATE should yield 1970-01-03
+ # after +18 hours: 1970-01-04 11:09:27 (UTC), TO_DATE should yield 1970-01-04
+ #
+ # timestamp_9_ is: 1970-01-05 00:01:18 (UTC), TO_DATE should yield 1970-01-05
+ # after +18 hours: 1970-01-05 18:01:18 (UTC), TO_DATE should yield 1970-01-05
+ #
+ # None of them are affected by pipeline time-zone config.
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`timestamp_ltz_0_` TIMESTAMP_LTZ(0),`comp_1` DATE,`comp_2`
TIMESTAMP_LTZ(0),`comp_3` DATE,`timestamp_ltz_6_` TIMESTAMP_LTZ(6),`comp_4`
DATE,`comp_5` TIMESTAMP_LTZ(3),`comp_6` DATE,`timestamp_ltz_9_`
TIMESTAMP_LTZ(9),`comp_7` DATE,`comp_8` TIMESTAMP_LTZ(3),`comp_9` DATE},
primaryKeys=id_, options=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1,
1970-01-02T10:17:36.789123456, 1970-01-02, 1970-01-03T04:17:36.789, 1970-01-03,
1970-01-03T17:09:27.891234561, 1970-01-03, 1970-01-04T11:09:27.891, 1970-01-04,
1970-01-05T00:01:18.912345612, 1970-01-05, 1970-01-05T18:01:18.912,
1970-01-05], op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1,
1970-01-02T10:17:36.789123456, 1970-01-02, 1970-01-03T04:17:36.789, 1970-01-03,
1970-01-03T17:09:27.891234561, 1970-01-03, 1970-01-04T11:09:27.891, 1970-01-04,
1970-01-05T00:01:18.912345612, 1970-01-05, 1970-01-05T18:01:18.912,
1970-01-05], after=[], op=DELETE, meta=()}
+
+- do: FLINK-39682 (Invoke TO_DATE with TIMESTAMP_LTZ, 24 hour offsets)
+ time-zone: Pacific/Tarawa
+ projection: |-
+ id_
+ timestamp_ltz_9_
+ TO_DATE(timestamp_ltz_9_) AS date_orig
+ TIMESTAMPADD(HOUR, -12, timestamp_ltz_9_) AS shift_m12
+ TO_DATE(TIMESTAMPADD(HOUR, -12, timestamp_ltz_9_)) AS date_m12
+ TIMESTAMPADD(HOUR, -11, timestamp_ltz_9_) AS shift_m11
+ TO_DATE(TIMESTAMPADD(HOUR, -11, timestamp_ltz_9_)) AS date_m11
+ TIMESTAMPADD(HOUR, -10, timestamp_ltz_9_) AS shift_m10
+ TO_DATE(TIMESTAMPADD(HOUR, -10, timestamp_ltz_9_)) AS date_m10
+ TIMESTAMPADD(HOUR, -9, timestamp_ltz_9_) AS shift_m9
+ TO_DATE(TIMESTAMPADD(HOUR, -9, timestamp_ltz_9_)) AS date_m9
+ TIMESTAMPADD(HOUR, -8, timestamp_ltz_9_) AS shift_m8
+ TO_DATE(TIMESTAMPADD(HOUR, -8, timestamp_ltz_9_)) AS date_m8
+ TIMESTAMPADD(HOUR, -7, timestamp_ltz_9_) AS shift_m7
+ TO_DATE(TIMESTAMPADD(HOUR, -7, timestamp_ltz_9_)) AS date_m7
+ TIMESTAMPADD(HOUR, -6, timestamp_ltz_9_) AS shift_m6
+ TO_DATE(TIMESTAMPADD(HOUR, -6, timestamp_ltz_9_)) AS date_m6
+ TIMESTAMPADD(HOUR, -5, timestamp_ltz_9_) AS shift_m5
+ TO_DATE(TIMESTAMPADD(HOUR, -5, timestamp_ltz_9_)) AS date_m5
+ TIMESTAMPADD(HOUR, -4, timestamp_ltz_9_) AS shift_m4
+ TO_DATE(TIMESTAMPADD(HOUR, -4, timestamp_ltz_9_)) AS date_m4
+ TIMESTAMPADD(HOUR, -3, timestamp_ltz_9_) AS shift_m3
+ TO_DATE(TIMESTAMPADD(HOUR, -3, timestamp_ltz_9_)) AS date_m3
+ TIMESTAMPADD(HOUR, -2, timestamp_ltz_9_) AS shift_m2
+ TO_DATE(TIMESTAMPADD(HOUR, -2, timestamp_ltz_9_)) AS date_m2
+ TIMESTAMPADD(HOUR, -1, timestamp_ltz_9_) AS shift_m1
+ TO_DATE(TIMESTAMPADD(HOUR, -1, timestamp_ltz_9_)) AS date_m1
+ TIMESTAMPADD(HOUR, 0, timestamp_ltz_9_) AS shift_p0
+ TO_DATE(TIMESTAMPADD(HOUR, 0, timestamp_ltz_9_)) AS date_p0
+ TIMESTAMPADD(HOUR, 1, timestamp_ltz_9_) AS shift_p1
+ TO_DATE(TIMESTAMPADD(HOUR, 1, timestamp_ltz_9_)) AS date_p1
+ TIMESTAMPADD(HOUR, 2, timestamp_ltz_9_) AS shift_p2
+ TO_DATE(TIMESTAMPADD(HOUR, 2, timestamp_ltz_9_)) AS date_p2
+ TIMESTAMPADD(HOUR, 3, timestamp_ltz_9_) AS shift_p3
+ TO_DATE(TIMESTAMPADD(HOUR, 3, timestamp_ltz_9_)) AS date_p3
+ TIMESTAMPADD(HOUR, 4, timestamp_ltz_9_) AS shift_p4
+ TO_DATE(TIMESTAMPADD(HOUR, 4, timestamp_ltz_9_)) AS date_p4
+ TIMESTAMPADD(HOUR, 5, timestamp_ltz_9_) AS shift_p5
+ TO_DATE(TIMESTAMPADD(HOUR, 5, timestamp_ltz_9_)) AS date_p5
+ TIMESTAMPADD(HOUR, 6, timestamp_ltz_9_) AS shift_p6
+ TO_DATE(TIMESTAMPADD(HOUR, 6, timestamp_ltz_9_)) AS date_p6
+ TIMESTAMPADD(HOUR, 7, timestamp_ltz_9_) AS shift_p7
+ TO_DATE(TIMESTAMPADD(HOUR, 7, timestamp_ltz_9_)) AS date_p7
+ TIMESTAMPADD(HOUR, 8, timestamp_ltz_9_) AS shift_p8
+ TO_DATE(TIMESTAMPADD(HOUR, 8, timestamp_ltz_9_)) AS date_p8
+ TIMESTAMPADD(HOUR, 9, timestamp_ltz_9_) AS shift_p9
+ TO_DATE(TIMESTAMPADD(HOUR, 9, timestamp_ltz_9_)) AS date_p9
+ TIMESTAMPADD(HOUR, 10, timestamp_ltz_9_) AS shift_p10
+ TO_DATE(TIMESTAMPADD(HOUR, 10, timestamp_ltz_9_)) AS date_p10
+ TIMESTAMPADD(HOUR, 11, timestamp_ltz_9_) AS shift_p11
+ TO_DATE(TIMESTAMPADD(HOUR, 11, timestamp_ltz_9_)) AS date_p11
+ filter: id_ > 0
+ primary-key: id_
+ # timestamp_ltz_9_ is: 1970-01-05T00:01:18.912345612 (UTC), epoch=345678912ms
+ # pipeline time-zone is Pacific/Tarawa (UTC+12).
+ #
+ # TIMESTAMPADD(HOUR, N, ...) always operates in UTC, producing 24 time points
+ # spanning all 24 UTC hours, straddling the midnight boundary:
+ #
+ # N=-12 → 1970-01-04T12:01:18.912 UTC (UTC+12 local: 1970-01-05T00:01:18)
→ TO_DATE = 1970-01-04
+ # N=-11 → 1970-01-04T13:01:18.912 UTC (UTC+12 local: 1970-01-05T01:01:18)
→ TO_DATE = 1970-01-04
+ # N=-10 → 1970-01-04T14:01:18.912 UTC (UTC+12 local: 1970-01-05T02:01:18)
→ TO_DATE = 1970-01-04
+ # ...
+ # N= -1 → 1970-01-04T23:01:18.912 UTC (UTC+12 local: 1970-01-05T11:01:18)
→ TO_DATE = 1970-01-04
+ # N= 0 → 1970-01-05T00:01:18.912 UTC (UTC+12 local: 1970-01-05T12:01:18)
→ TO_DATE = 1970-01-05
+ # ...
+ # N= 11 → 1970-01-05T11:01:18.912 UTC (UTC+12 local: 1970-01-05T23:01:18)
→ TO_DATE = 1970-01-05
+ #
+ # TO_DATE always uses UTC, so N=-12..-1 → 1970-01-04, N=0..+11 → 1970-01-05.
+ # A buggy implementation using pipeline time-zone (UTC+12) would return
1970-01-05 for all 24 entries.
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`timestamp_ltz_9_` TIMESTAMP_LTZ(9),`date_orig`
DATE,`shift_m12` TIMESTAMP_LTZ(3),`date_m12` DATE,`shift_m11`
TIMESTAMP_LTZ(3),`date_m11` DATE,`shift_m10` TIMESTAMP_LTZ(3),`date_m10`
DATE,`shift_m9` TIMESTAMP_LTZ(3),`date_m9` DATE,`shift_m8`
TIMESTAMP_LTZ(3),`date_m8` DATE,`shift_m7` TIMESTAMP_LTZ(3),`date_m7`
DATE,`shift_m6` TIMESTAMP_LTZ(3),`date_m6` DATE,`shift_m5`
TIMESTAMP_LTZ(3),`date_m5` D [...]
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1,
1970-01-05T00:01:18.912345612, 1970-01-05, 1970-01-04T12:01:18.912, 1970-01-04,
1970-01-04T13:01:18.912, 1970-01-04, 1970-01-04T14:01:18.912, 1970-01-04,
1970-01-04T15:01:18.912, 1970-01-04, 1970-01-04T16:01:18.912, 1970-01-04,
1970-01-04T17:01:18.912, 1970-01-04, 1970-01-04T18:01:18.912, 1970-01-04,
1970-01-04T19:01:18.912, 1970-01-04, 1970-01-04T20:01:18.912, 1970-01-04,
1970-01-04T21:01:18.912, 1970-01-04, 1970-01-04T22:01: [...]
+ DataChangeEvent{tableId=foo.bar.baz, before=[1,
1970-01-05T00:01:18.912345612, 1970-01-05, 1970-01-04T12:01:18.912, 1970-01-04,
1970-01-04T13:01:18.912, 1970-01-04, 1970-01-04T14:01:18.912, 1970-01-04,
1970-01-04T15:01:18.912, 1970-01-04, 1970-01-04T16:01:18.912, 1970-01-04,
1970-01-04T17:01:18.912, 1970-01-04, 1970-01-04T18:01:18.912, 1970-01-04,
1970-01-04T19:01:18.912, 1970-01-04, 1970-01-04T20:01:18.912, 1970-01-04,
1970-01-04T21:01:18.912, 1970-01-04, 1970-01-04T22:01:18.912, 19 [...]
+
+- do: TO_DATE chained with FROM_UNIXTIME (common CDC pattern)
+ projection: |-
+ id_
+ TO_DATE(FROM_UNIXTIME(bigint_)) AS comp_1
+ TO_DATE(FROM_UNIXTIME(bigint_, 'yyyy-MM-dd')) AS comp_2
+ primary-key: id_
+ # bigint_ = 5 for record1 => FROM_UNIXTIME(5) = "1970-01-01 00:00:05"
+ # TO_DATE("1970-01-01 00:00:05") = null (format mismatch: contains time,
TO_DATE expects 'yyyy-MM-dd')
+ # FROM_UNIXTIME(5, 'yyyy-MM-dd') = "1970-01-01", TO_DATE("1970-01-01") =
1970-01-01
+ # bigint_ = -5 for record2 => FROM_UNIXTIME(-5) = "1969-12-31 23:59:55"
+ # TO_DATE("1969-12-31 23:59:55") = null (same reason)
+ # FROM_UNIXTIME(-5, 'yyyy-MM-dd') = "1969-12-31", TO_DATE("1969-12-31") =
1969-12-31
+ # NOTE: Customers should always use FROM_UNIXTIME(x, 'yyyy-MM-dd') when
chaining with TO_DATE
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`comp_1` DATE,`comp_2` DATE}, primaryKeys=id_, options=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, null,
1970-01-01], op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1, null, 1970-01-01],
after=[-1, null, 1969-12-31], op=UPDATE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, 1969-12-31],
after=[], op=DELETE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
+
+- do: TO_DATE chained with DATE_FORMAT (extract date from timestamp)
+ projection: |-
+ id_
+ TO_DATE(DATE_FORMAT(timestamp_0_, 'yyyy-MM-dd')) AS comp_1
+ TO_DATE(DATE_FORMAT(timestamp_0_, 'yyyy-MM-01')) AS comp_2
+ primary-key: id_
+ # timestamp_0_ record1 = 1970-01-02T10:17:36
+ # DATE_FORMAT(..., 'yyyy-MM-dd') = "1970-01-02", TO_DATE = 1970-01-02
+ # DATE_FORMAT(..., 'yyyy-MM-01') = "1970-01-01", TO_DATE = 1970-01-01
+ # timestamp_0_ record2 = 1970-01-09T08:57:36
+ # DATE_FORMAT(..., 'yyyy-MM-dd') = "1970-01-09", TO_DATE = 1970-01-09
+ # DATE_FORMAT(..., 'yyyy-MM-01') = "1970-01-01", TO_DATE = 1970-01-01
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`comp_1` DATE,`comp_2` DATE}, primaryKeys=id_, options=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1970-01-02,
1970-01-01], op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1, 1970-01-02, 1970-01-01],
after=[-1, 1970-01-09, 1970-01-01], op=UPDATE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[-1, 1970-01-09, 1970-01-01],
after=[], op=DELETE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
+
+- do: TO_DATE with CASE WHEN null-guard pattern (common CDC pattern)
+ projection: |-
+ id_
+ CASE WHEN timestamp_0_ IS NULL THEN TO_DATE('2000-01-01') ELSE
TO_DATE(timestamp_0_) END AS comp_1
+ CASE WHEN bigint_ IS NULL THEN TO_DATE('2000-01-01') ELSE
TO_DATE(FROM_UNIXTIME(bigint_, 'yyyy-MM-dd')) END AS comp_2
+ primary-key: id_
+ # record1: timestamp_0_ not null => TO_DATE(1970-01-02T10:17:36) = 1970-01-02
+ # bigint_=5, not null => TO_DATE(FROM_UNIXTIME(5,'yyyy-MM-dd')) =
TO_DATE("1970-01-01") = 1970-01-01
+ # record2: timestamp_0_ not null => TO_DATE(1970-01-09T08:57:36) = 1970-01-09
+ # bigint_=-5, not null => TO_DATE(FROM_UNIXTIME(-5,'yyyy-MM-dd'))
= TO_DATE("1969-12-31") = 1969-12-31
+ # record3: both null => TO_DATE('2000-01-01') = 2000-01-01
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`comp_1` DATE,`comp_2` DATE}, primaryKeys=id_, options=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1970-01-02,
1970-01-01], op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1, 1970-01-02, 1970-01-01],
after=[-1, 1970-01-09, 1969-12-31], op=UPDATE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[-1, 1970-01-09, 1969-12-31],
after=[], op=DELETE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, 2000-01-01,
2000-01-01], op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[0, 2000-01-01, 2000-01-01],
after=[], op=DELETE, meta=()}
+
+- do: TO_DATE with TO_TIMESTAMP_LTZ (convert epoch to date)
+ projection: |-
+ id_
+ TO_DATE(TO_TIMESTAMP_LTZ(bigint_, 0)) AS comp_1
+ primary-key: id_
+ # bigint_ = 5 for record1 => TO_TIMESTAMP_LTZ(5, 0) = epoch second 5 =
1970-01-01T00:00:05 UTC
+ # TO_DATE => 1970-01-01
+ # bigint_ = -5 for record2 => TO_TIMESTAMP_LTZ(-5, 0) = 1969-12-31T23:59:55
UTC
+ # TO_DATE => 1969-12-31
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`comp_1` DATE}, primaryKeys=id_, options=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 1970-01-01],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1, 1970-01-01], after=[-1,
1969-12-31], op=UPDATE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[-1, 1969-12-31], after=[],
op=DELETE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[],
op=DELETE, meta=()}
+
+- do: TO_DATE with DATE_FORMAT yyyyMMdd pattern (CDC partition key)
+ projection: |-
+ id_
+ DATE_FORMAT(timestamp_0_, 'yyyyMMdd') AS comp_1
+ TO_DATE(DATE_FORMAT(timestamp_6_, 'yyyyMMdd'), 'yyyyMMdd') AS comp_2
+ primary-key: id_
+ # timestamp_0_ record1 = 1970-01-02T10:17:36 => DATE_FORMAT = "19700102"
+ # timestamp_6_ record1 = 1970-01-03T17:09:27 => DATE_FORMAT = "19700103",
TO_DATE("19700103","yyyyMMdd") = 1970-01-03
+ # timestamp_0_ record2 = 1970-01-09T08:57:36 => DATE_FORMAT = "19700109"
+ # timestamp_6_ record2 = 1970-01-10T15:49:27 => DATE_FORMAT = "19700110",
TO_DATE("19700110","yyyyMMdd") = 1970-01-10
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`comp_1` STRING,`comp_2` DATE}, primaryKeys=id_, options=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 19700102,
1970-01-03], op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1, 19700102, 1970-01-03],
after=[-1, 19700109, 1970-01-10], op=UPDATE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[-1, 19700109, 1970-01-10],
after=[], op=DELETE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/TemporalFunctions.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/TemporalFunctions.java
index e8e0291b5..de31a81fc 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/TemporalFunctions.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/TemporalFunctions.java
@@ -30,6 +30,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
+import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.TimeZone;
@@ -152,6 +153,27 @@ public class TemporalFunctions {
return DateTimeUtils.parseDate(str, format);
}
+ public static LocalDate toDate(LocalDateTime localDateTime) {
+ if (localDateTime == null) {
+ return null;
+ }
+ return localDateTime.toLocalDate();
+ }
+
+ public static LocalDate toDate(ZonedDateTime zonedDateTime) {
+ if (zonedDateTime == null) {
+ return null;
+ }
+ return zonedDateTime.toLocalDate();
+ }
+
+ public static LocalDate toDate(Instant instant) {
+ if (instant == null) {
+ return null;
+ }
+ return LocalDateTime.ofInstant(instant,
ZoneId.of("UTC")).toLocalDate();
+ }
+
public static LocalDateTime toTimestamp(String str, String timezone) {
return toTimestamp(str, "yyyy-MM-dd HH:mm:ss", timezone);
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index 6044fbd5d..eef0755c1 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -285,8 +285,10 @@ public class TransformSqlOperatorTable extends
ReflectiveSqlOperatorTable {
SqlTypeTransforms.FORCE_NULLABLE),
null,
OperandTypes.or(
+ // Only "from string" mode supports specifying
formatter.
OperandTypes.family(SqlTypeFamily.STRING),
- OperandTypes.family(SqlTypeFamily.STRING,
SqlTypeFamily.STRING)),
+ OperandTypes.family(SqlTypeFamily.STRING,
SqlTypeFamily.STRING),
+ OperandTypes.family(SqlTypeFamily.TIMESTAMP)),
SqlFunctionCategory.TIMEDATE);
public static final SqlFunction TO_TIMESTAMP =
new SqlFunction(