This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 4271144fa [FLINK-38906] Pass transform parser in context and polish
built-in functions (#4248)
4271144fa is described below
commit 4271144faa0c7874122a07a8c97faca2d923f6be
Author: yuxiqian <[email protected]>
AuthorDate: Mon Feb 2 11:31:34 2026 +0800
[FLINK-38906] Pass transform parser in context and polish built-in
functions (#4248)
Co-authored-by: Jia Fan <[email protected]>
---
docs/content.zh/docs/core-concept/transform.md | 29 +--
docs/content/docs/core-concept/transform.md | 145 +++++++-------
.../flink/FlinkPipelineTransformITCase.java | 11 +-
.../src/test/resources/specs/comparison.yaml | 4 -
.../src/test/resources/specs/decimal.yaml | 8 -
.../src/test/resources/specs/meta.yaml | 12 --
.../src/test/resources/specs/temporal.yaml | 12 --
.../functions/impl/ComparisonFunctions.java | 33 +++-
.../operators/transform/PostTransformOperator.java | 2 +-
.../operators/transform/PreTransformOperator.java | 2 +-
.../transform/ProjectionColumnProcessor.java | 3 +-
.../transform/TransformExpressionCompiler.java | 16 +-
.../transform/TransformExpressionKey.java | 28 +--
.../operators/transform/TransformFilter.java | 24 +--
.../transform/TransformFilterProcessor.java | 32 ++-
.../flink/cdc/runtime/parser/JaninoCompiler.java | 218 ++++++++++++---------
.../flink/cdc/runtime/parser/TransformParser.java | 46 ++++-
.../parser/metadata/TransformSqlOperatorTable.java | 46 +++++
.../cdc/runtime/parser/TransformParserTest.java | 47 ++++-
19 files changed, 447 insertions(+), 271 deletions(-)
diff --git a/docs/content.zh/docs/core-concept/transform.md
b/docs/content.zh/docs/core-concept/transform.md
index 5b56057ce..aec40d70a 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -142,18 +142,18 @@ Flink CDC 使用 [Calcite](https://calcite.apache.org/)
来解析表达式并且
## 数学函数
-| Function | Janino Code | Description
|
-|---------------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| numeric1 + numeric2 | numeric1 + numeric2 | Returns NUMERIC1 plus NUMERIC2.
|
-| numeric1 - numeric2 | numeric1 - numeric2 | Returns NUMERIC1 minus NUMERIC2.
|
-| numeric1 * numeric2 | numeric1 * numeric2 | Returns NUMERIC1 multiplied by
NUMERIC2.
|
-| numeric1 / numeric2 | numeric1 / numeric2 | Returns NUMERIC1 divided by
NUMERIC2.
|
-| numeric1 % numeric2 | numeric1 % numeric2 | Returns the remainder (modulus)
of numeric1 divided by numeric2.
|
-| ABS(numeric) | abs(numeric) | Returns the absolute value of
numeric.
|
-| CEIL(numeric)<br/>CEILING(numeric) | ceil(numeric) | Rounds
numeric up, and returns the smallest number that is greater than or equal to
numeric.
|
-| FLOOR(numeric) | floor(numeric) | Rounds numeric down, and returns
the largest number that is less than or equal to numeric.
|
-| ROUND(numeric, int) | round(numeric) | Returns a number rounded to INT
decimal places for NUMERIC.
|
-| UUID() | uuid() | Returns an UUID (Universally
Unique Identifier) string (e.g., "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e")
according to RFC 4122 type 4 (pseudo randomly generated) UUID. |
+| Function | Janino Code | Description
|
+|------------------------------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| numeric1 + numeric2 | numeric1 + numeric2 | Returns NUMERIC1
plus NUMERIC2.
|
+| numeric1 - numeric2 | numeric1 - numeric2 | Returns NUMERIC1
minus NUMERIC2.
|
+| numeric1 * numeric2 | numeric1 * numeric2 | Returns NUMERIC1
multiplied by NUMERIC2.
|
+| numeric1 / numeric2 | numeric1 / numeric2 | Returns NUMERIC1
divided by NUMERIC2.
|
+| numeric1 % numeric2 | numeric1 % numeric2 | Returns the
remainder (modulus) of numeric1 divided by numeric2.
|
+| ABS(numeric) | abs(numeric) | Returns the
absolute value of numeric.
|
+| CEIL(numeric)<br/>CEILING(numeric) | ceil(numeric) | Rounds numeric
up, and returns the smallest number that is greater than or equal to numeric.
|
+| FLOOR(numeric) | floor(numeric) | Rounds numeric
down, and returns the largest number that is less than or equal to numeric.
|
+| ROUND(numeric, int) | round(numeric) | Returns a number
rounded to INT decimal places for NUMERIC.
|
+| UUID() | uuid() | Returns an UUID
(Universally Unique Identifier) string (e.g.,
"3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo
randomly generated) UUID. |
## 字符串函数
@@ -180,13 +180,18 @@ Flink CDC 使用 [Calcite](https://calcite.apache.org/)
来解析表达式并且
| CURRENT_TIMESTAMP | currentTimestamp()
| Returns the current SQL timestamp in the local
time zone, the return type is TIMESTAMP_LTZ(3).
[...]
| NOW() | now()
| Returns the current SQL timestamp in the local
time zone, this is a synonym of CURRENT_TIMESTAMP.
[...]
| DATE_FORMAT(timestamp, string) | dateFormat(timestamp,
string) | Converts timestamp to a value of string in the
format specified by the date format string. The format string is compatible
with Java's SimpleDateFormat.
[...]
+| DATE_FORMAT(date, string) | dateFormat(date,
string) | Converts given date to a value of string
in the format specified by the format string. The format string is compatible
with Java's SimpleDateFormat.
[...]
+| DATE_FORMAT(time, string) | dateFormat(time,
string) | Converts given time to a value of string
in the format specified by the format string. The format string is compatible
with Java's SimpleDateFormat.
[...]
+| DATE_FORMAT_TZ(timestamp, format, timezone) |
dateFormatTz(timestamp, format, timezone) | Formats a timestamp or
datetime value as a string using the given pattern and the specified time zone.
The timezone argument can be a time zone ID (for example, 'UTC',
'Asia/Shanghai') or an offset such as '+08:00'.
[...]
| TIMESTAMPADD(timeintervalunit, interval, timepoint) |
timestampadd(timeintervalunit, interval, timepoint) | Returns the timestamp of
timepoint2 after timepoint added interval. The unit for the interval is given
by the first argument, which should be one of the following values: SECOND,
MINUTE, HOUR, DAY, MONTH, or YEAR.
[...]
| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) |
timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed)
number of timepointunit between timepoint1 and timepoint2. The unit for the
interval is given by the first argument, which should be one of the following
values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.
[...]
| TO_DATE(string1[, string2]) | toDate(string1[,
string2]) | Converts a date string string1 with
format string2 (by default 'yyyy-MM-dd') to a date.
[...]
| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[,
string2]) | Converts date time string string1 with format
string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone.
[...]
+| TO_TIMESTAMP_LTZ(string1[, string2]) |
toTimestampLtz(string1[, string2]) | Converts date time
string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a
timestamp, with local time zone.
[...]
| FROM_UNIXTIME(numeric[, string]) |
fromUnixtime(NUMERIC[, STRING]) | Returns a representation
of the numeric argument as a value in string format (default is ‘yyyy-MM-dd
HH:mm:ss’). numeric is an internal timestamp value representing seconds since
‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function.
The return value is expressed in the session time zone (specified in
TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01 [...]
| UNIX_TIMESTAMP() | unixTimestamp()
| Gets current Unix timestamp in seconds. This
function is not deterministic which means the value would be recalculated for
each record.
[...]
| UNIX_TIMESTAMP(string1[, string2]) |
unixTimestamp(STRING1[, STRING2]) | Converts a date time
string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not
specified) to Unix timestamp (in seconds), using the specified timezone in
table config.<br/>If a time zone is specified in the date time string and
parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will
use the specified timezone in the date time string inste [...]
+| DATE_ADD(date, int) | dateAdd(date, int)
| Adds N days to the given date and returns a
string in 'yyyy-MM-dd' format.
[...]
## 条件函数
diff --git a/docs/content/docs/core-concept/transform.md
b/docs/content/docs/core-concept/transform.md
index 024401782..584c3606d 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -112,91 +112,96 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to
parse expressions and [
## Comparison Functions
-| Function | Janino Code |
Description |
-|----------------------|----------------------------------------------|-----------------------------------------------------------------|
-| value1 = value2 | valueEquals(value1, value2) |
Returns TRUE if value1 is equal to value2; returns FALSE if value1 or value2 is
NULL. |
-| value1 <> value2 | !valueEquals(value1, value2) |
Returns TRUE if value1 is not equal to value2; returns FALSE if value1 or
value2 is NULL. |
-| value1 > value2 | greaterThan(value1, value2) |
Returns TRUE if value1 is greater than value2; returns FALSE if value1 or
value2 is NULL. |
-| value1 >= value2 | greaterThanOrEqual(value1, value2) |
Returns TRUE if value1 is greater than or equal to value2; returns FALSE if
value1 or value2 is NULL. |
-| value1 < value2 | lessThan(value1, value2) |
Returns TRUE if value1 is less than value2; returns FALSE if value1 or value2
is NULL. |
-| value1 <= value2 | lessThanOrEqual(value1, value2) |
Returns TRUE if value1 is less than or equal to value2; returns FALSE if value1
or value2 is NULL. |
-| value IS NULL | null == value |
Returns TRUE if value is NULL. |
-| value IS NOT NULL | null != value |
Returns TRUE if value is not NULL. |
-| value1 BETWEEN value2 AND value3 | betweenAsymmetric(value1, value2, value3)
| Returns TRUE if value1 is greater than or equal to value2 and less than or
equal to value3. |
-| value1 NOT BETWEEN value2 AND value3 | notBetweenAsymmetric(value1, value2,
value3) | Returns TRUE if value1 is less than value2 or greater than value3. |
-| string1 LIKE string2 | like(string1, string2) |
Returns TRUE if string1 matches pattern string2. |
-| string1 NOT LIKE string2 | notLike(string1, string2) |
Returns TRUE if string1 does not match pattern string2. |
-| value1 IN (value2 [, value3]* ) | in(value1, value2 [, value3]*)
| Returns TRUE if value1 exists in the given list (value2, value3, …). |
-| value1 NOT IN (value2 [, value3]* ) | notIn(value1, value2 [, value3]*)
| Returns TRUE if value1 does not exist in the given list (value2,
value3, …). |
+| Function | Janino Code
| Description
|
+|--------------------------------------|----------------------------------------------|-------------------------------------------------------------------------------------------------------|
+| value1 = value2 | valueEquals(value1, value2)
| Returns TRUE if value1 is equal to value2; returns FALSE if value1 or
value2 is NULL. |
+| value1 <> value2 | !valueEquals(value1, value2)
| Returns TRUE if value1 is not equal to value2; returns FALSE if value1
or value2 is NULL. |
+| value1 > value2 | greaterThan(value1, value2)
| Returns TRUE if value1 is greater than value2; returns FALSE if value1
or value2 is NULL. |
+| value1 >= value2 | greaterThanOrEqual(value1, value2)
| Returns TRUE if value1 is greater than or equal to value2; returns
FALSE if value1 or value2 is NULL. |
+| value1 < value2 | lessThan(value1, value2)
| Returns TRUE if value1 is less than value2; returns FALSE if value1 or
value2 is NULL. |
+| value1 <= value2 | lessThanOrEqual(value1, value2)
| Returns TRUE if value1 is less than or equal to value2; returns FALSE
if value1 or value2 is NULL. |
+| value IS NULL | null == value
| Returns TRUE if value is NULL.
|
+| value IS NOT NULL | null != value
| Returns TRUE if value is not NULL.
|
+| value1 BETWEEN value2 AND value3 | betweenAsymmetric(value1, value2,
value3) | Returns TRUE if value1 is greater than or equal to value2 and less
than or equal to value3. |
+| value1 NOT BETWEEN value2 AND value3 | notBetweenAsymmetric(value1, value2,
value3) | Returns TRUE if value1 is less than value2 or greater than value3.
|
+| string1 LIKE string2 | like(string1, string2)
| Returns TRUE if string1 matches pattern string2.
|
+| string1 NOT LIKE string2 | notLike(string1, string2)
| Returns TRUE if string1 does not match pattern string2.
|
+| value1 IN (value2 [, value3]* ) | in(value1, value2 [, value3]*)
| Returns TRUE if value1 exists in the given list (value2, value3, …).
|
+| value1 NOT IN (value2 [, value3]* ) | notIn(value1, value2 [, value3]*)
| Returns TRUE if value1 does not exist in the given list (value2,
value3, …). |
## Logical Functions
-| Function | Janino Code | Description
|
-|----------------------|-----------------------------|-----------------------------------------------------------------|
-| boolean1 OR boolean2 | boolean1 || boolean2 | Returns TRUE if
BOOLEAN1 is TRUE or BOOLEAN2 is TRUE. |
-| boolean1 AND boolean2 | boolean1 && boolean2 | Returns TRUE if
BOOLEAN1 and BOOLEAN2 are both TRUE. |
-| NOT boolean | !boolean | Returns TRUE if boolean
is FALSE; returns FALSE if boolean is TRUE. |
-| boolean IS FALSE | false == boolean | Returns TRUE if boolean
is FALSE; returns FALSE if boolean is TRUE. |
-| boolean IS NOT FALSE | true == boolean | Returns TRUE if BOOLEAN
is TRUE; returns FALSE if BOOLEAN is FALSE. |
-| boolean IS TRUE | true == boolean | Returns TRUE if BOOLEAN
is TRUE; returns FALSE if BOOLEAN is FALSE. |
-| boolean IS NOT TRUE | false == boolean | Returns TRUE if boolean
is FALSE; returns FALSE if boolean is TRUE. |
+| Function | Janino Code | Description
|
+|-----------------------|--------------------------------|---------------------------------------------------------------------|
+| boolean1 OR boolean2 | boolean1 || boolean2 | Returns TRUE if
BOOLEAN1 is TRUE or BOOLEAN2 is TRUE. |
+| boolean1 AND boolean2 | boolean1 && boolean2 | Returns TRUE if
BOOLEAN1 and BOOLEAN2 are both TRUE. |
+| NOT boolean | !boolean | Returns TRUE if
boolean is FALSE; returns FALSE if boolean is TRUE. |
+| boolean IS FALSE | false == boolean | Returns TRUE if
boolean is FALSE; returns FALSE if boolean is TRUE. |
+| boolean IS NOT FALSE | true == boolean | Returns TRUE if
BOOLEAN is TRUE; returns FALSE if BOOLEAN is FALSE. |
+| boolean IS TRUE | true == boolean | Returns TRUE if
BOOLEAN is TRUE; returns FALSE if BOOLEAN is FALSE. |
+| boolean IS NOT TRUE | false == boolean | Returns TRUE if
boolean is FALSE; returns FALSE if boolean is TRUE. |
## Arithmetic Functions
-| Function | Janino Code |
Description |
-|------------------------------------|-----------------------------|-----------------------------------------------------------------|
-| numeric1 + numeric2 | numeric1 + numeric2 | Returns
NUMERIC1 plus NUMERIC2. |
-| numeric1 - numeric2 | numeric1 - numeric2 | Returns
NUMERIC1 minus NUMERIC2. |
-| numeric1 * numeric2 | numeric1 * numeric2 | Returns
NUMERIC1 multiplied by NUMERIC2. |
-| numeric1 / numeric2 | numeric1 / numeric2 | Returns
NUMERIC1 divided by NUMERIC2. |
-| numeric1 % numeric2 | numeric1 % numeric2 | Returns
the remainder (modulus) of numeric1 divided by numeric2. |
-| ABS(numeric) | abs(numeric) | Returns
the absolute value of numeric. |
-| CEIL(numeric)<br/>CEILING(numeric) | ceil(numeric) | Rounds
numeric up, and returns the smallest number that is greater than or equal to
numeric. |
-| FLOOR(numeric) | floor(numeric) | Rounds
numeric down, and returns the largest number that is less than or equal to
numeric. |
-| ROUND(numeric, int) | round(numeric) | Returns a
number rounded to INT decimal places for NUMERIC. |
-| UUID() | uuid() | Returns
an UUID (Universally Unique Identifier) string (e.g.,
"3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo
randomly generated) UUID. |
+| Function | Janino Code | Description
|
+|------------------------------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| numeric1 + numeric2 | numeric1 + numeric2 | Returns NUMERIC1
plus NUMERIC2.
|
+| numeric1 - numeric2 | numeric1 - numeric2 | Returns NUMERIC1
minus NUMERIC2.
|
+| numeric1 * numeric2 | numeric1 * numeric2 | Returns NUMERIC1
multiplied by NUMERIC2.
|
+| numeric1 / numeric2 | numeric1 / numeric2 | Returns NUMERIC1
divided by NUMERIC2.
|
+| numeric1 % numeric2 | numeric1 % numeric2 | Returns the
remainder (modulus) of numeric1 divided by numeric2.
|
+| ABS(numeric) | abs(numeric) | Returns the
absolute value of numeric.
|
+| CEIL(numeric)<br/>CEILING(numeric) | ceil(numeric) | Rounds numeric
up, and returns the smallest number that is greater than or equal to numeric.
|
+| FLOOR(numeric) | floor(numeric) | Rounds numeric
down, and returns the largest number that is less than or equal to numeric.
|
+| ROUND(numeric, int) | round(numeric) | Returns a number
rounded to INT decimal places for NUMERIC.
|
+| UUID() | uuid() | Returns an UUID
(Universally Unique Identifier) string (e.g.,
"3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo
randomly generated) UUID. |
## String Functions
-| Function | Janino Code | Description
|
-| -------------------- | ------------------------
|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| string1 || string2 | concat(string1, string2) | Returns the
concatenation of STRING1 and STRING2.
|
-| CHAR_LENGTH(string) | charLength(string) | Returns the number of
characters in STRING.
|
-| UPPER(string) | upper(string) | Returns string in
uppercase.
|
-| LOWER(string) | lower(string) | Returns string in lowercase.
|
-| TRIM(string1) | trim('BOTH',string1) | Returns a string that removes
whitespaces at both sides.
|
-| REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2,
string3) | Returns a string from STRING1 with all the substrings that match a
regular expression STRING2 consecutively being replaced with STRING3. E.g.,
'foobar'.regexpReplace('oo\|ar', '') returns "fb". |
-| SUBSTR(string, integer1[, integer2]) | substr(string,integer1,integer2) |
Returns a substring of STRING starting from position integer1 with length
integer2 (to the end by default).
|
-| SUBSTRING(string FROM integer1 [ FOR integer2 ]) |
substring(string,integer1,integer2) | Returns a substring of STRING starting
from position integer1 with length integer2 (to the end by default).
|
-| CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string
that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns
'AABBCC'.
|
+| Function | Janino Code
| Description
|
+|--------------------------------------------------|------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| string1 || string2 | concat(string1, string2)
| Returns the concatenation of STRING1 and STRING2.
|
+| CHAR_LENGTH(string) | charLength(string)
| Returns the number of characters in STRING.
|
+| UPPER(string) | upper(string)
| Returns string in uppercase.
|
+| LOWER(string) | lower(string)
| Returns string in lowercase.
|
+| TRIM(string1) | trim('BOTH',string1)
| Returns a string that removes whitespaces at both sides.
|
+| REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1,
string2, string3) | Returns a string from STRING1 with all the substrings that
match a regular expression STRING2 consecutively being replaced with STRING3.
E.g., 'foobar'.regexpReplace('oo\|ar', '') returns "fb". |
+| SUBSTR(string, integer1[, integer2]) |
substr(string,integer1,integer2) | Returns a substring of STRING
starting from position integer1 with length integer2 (to the end by default).
|
+| SUBSTRING(string FROM integer1 [ FOR integer2 ]) |
substring(string,integer1,integer2) | Returns a substring of STRING
starting from position integer1 with length integer2 (to the end by default).
|
+| CONCAT(string1, string2,…) | concat(string1,
string2,…) | Returns a string that concatenates string1, string2,
…. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'.
|
## Temporal Functions
-| Function | Janino Code | Description
|
-| -------------------- | ------------------------ |
------------------------------------------------- |
-| LOCALTIME | localtime() | Returns the current SQL time in the local time
zone, the return type is TIME(0). |
-| LOCALTIMESTAMP | localtimestamp() | Returns the current SQL timestamp in
local time zone, the return type is TIMESTAMP(3). |
-| CURRENT_TIME | currentTime() | Returns the current SQL time in the local
time zone, this is a synonym of LOCAL_TIME. |
-| CURRENT_DATE | currentDate() |
Returns the current SQL date in the local time zone. |
-| CURRENT_TIMESTAMP | currentTimestamp() | Returns the current SQL timestamp
in the local time zone, the return type is TIMESTAMP_LTZ(3). |
-| NOW() | now() | Returns the current SQL timestamp in the local time zone,
this is a synonym of CURRENT_TIMESTAMP. |
-| DATE_FORMAT(timestamp, string) | dateFormat(timestamp, string) | Converts
timestamp to a value of string in the format specified by the date format
string. The format string is compatible with Java's SimpleDateFormat. |
-| TIMESTAMPADD(timeintervalunit, interval, timepoint) |
timestampadd(timeintervalunit, interval, timepoint) | Returns the timestamp of
timepoint2 after timepoint added interval. The unit for the interval is given
by the first argument, which should be one of the following values: SECOND,
MINUTE, HOUR, DAY, MONTH, or YEAR. |
-| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) |
timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed)
number of timepointunit between timepoint1 and timepoint2. The unit for the
interval is given by the first argument, which should be one of the following
values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
-| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date
string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
-| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) |
Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd
HH:mm:ss') to a timestamp, without time zone. |
-| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns
a representation of the numeric argument as a value in string format (default
is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing
seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the
UNIX_TIMESTAMP() function. The return value is expressed in the session time
zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01
00:00:44’ if in UTC time zone, but re [...]
-| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds.
This function is not deterministic which means the value would be recalculated
for each record. |
-| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) |
Converts a date time string string1 with format string2 (by default: yyyy-MM-dd
HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified
timezone in table config.<br/>If a time zone is specified in the date time
string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this
function will use the specified timezone in the date time string instead of the
timezone in table config. I [...]
+| Function | Janino Code
| Description
[...]
+|------------------------------------------------------|------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| LOCALTIME | localtime()
| Returns the current SQL time in the local time
zone, the return type is TIME(0).
[...]
+| LOCALTIMESTAMP | localtimestamp()
| Returns the current SQL timestamp in local
time zone, the return type is TIMESTAMP(3).
[...]
+| CURRENT_TIME | currentTime()
| Returns the current SQL time in the local time
zone, this is a synonym of LOCAL_TIME.
[...]
+| CURRENT_DATE | currentDate()
| Returns the current SQL date in the local time
zone.
[...]
+| CURRENT_TIMESTAMP | currentTimestamp()
| Returns the current SQL timestamp in the local
time zone, the return type is TIMESTAMP_LTZ(3).
[...]
+| NOW() | now()
| Returns the current SQL timestamp in the local
time zone, this is a synonym of CURRENT_TIMESTAMP.
[...]
+| DATE_FORMAT(timestamp, string) | dateFormat(timestamp,
string) | Converts timestamp to a value of string in the
format specified by the format string. The format string is compatible with
Java's SimpleDateFormat.
[...]
+| DATE_FORMAT(date, string) | dateFormat(date,
string) | Converts given date to a value of string
in the format specified by the format string. The format string is compatible
with Java's SimpleDateFormat.
[...]
+| DATE_FORMAT(time, string) | dateFormat(time,
string) | Converts given time to a value of string
in the format specified by the format string. The format string is compatible
with Java's SimpleDateFormat.
[...]
+| DATE_FORMAT_TZ(timestamp, format, timezone) |
dateFormatTz(timestamp, format, timezone) | Formats a timestamp or
datetime value as a string using the given pattern and the specified time zone.
The timezone argument can be a time zone ID (for example, 'UTC',
'Asia/Shanghai') or an offset such as '+08:00'.
[...]
+| TIMESTAMPADD(timeintervalunit, interval, timepoint) |
timestampadd(timeintervalunit, interval, timepoint) | Returns the timestamp of
timepoint2 after timepoint added interval. The unit for the interval is given
by the first argument, which should be one of the following values: SECOND,
MINUTE, HOUR, DAY, MONTH, or YEAR.
[...]
+| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) |
timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed)
number of timepointunit between timepoint1 and timepoint2. The unit for the
interval is given by the first argument, which should be one of the following
values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.
[...]
+| TO_DATE(string1[, string2]) | toDate(string1[,
string2]) | Converts a date string string1 with
format string2 (by default 'yyyy-MM-dd') to a date.
[...]
+| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[,
string2]) | Converts date time string string1 with format
string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone.
[...]
+| TO_TIMESTAMP_LTZ(string1[, string2]) |
toTimestampLtz(string1[, string2]) | Converts date time
string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a
timestamp, with local time zone.
[...]
+| FROM_UNIXTIME(numeric[, string]) |
fromUnixtime(NUMERIC[, STRING]) | Returns a representation
of the numeric argument as a value in string format (default is ‘yyyy-MM-dd
HH:mm:ss’). numeric is an internal timestamp value representing seconds since
‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function.
The return value is expressed in the session time zone (specified in
TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01 [...]
+| UNIX_TIMESTAMP() | unixTimestamp()
| Gets current Unix timestamp in seconds. This
function is not deterministic which means the value would be recalculated for
each record.
[...]
+| UNIX_TIMESTAMP(string1[, string2]) |
unixTimestamp(STRING1[, STRING2]) | Converts a date time
string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not
specified) to Unix timestamp (in seconds), using the specified timezone in
table config.<br/>If a time zone is specified in the date time string and
parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will
use the specified timezone in the date time string inste [...]
+| DATE_ADD(date, int) | dateAdd(date, int)
| Add N days to given date data.
[...]
## Conditional Functions
-| Function | Janino Code | Description
|
-| -------------------- | ------------------------ |
------------------------------------------------- |
-| CASE value WHEN value1_1 [, value1_2]* THEN RESULT1 (WHEN value2_1 [,
value2_2 ]* THEN result_2)* (ELSE result_z) END | Nested ternary expression |
Returns resultX when the first time value is contained in (valueX_1, valueX_2,
…). When no value matches, returns result_z if it is provided and returns NULL
otherwise. |
-| CASE WHEN condition1 THEN result1 (WHEN condition2 THEN result2)* (ELSE
result_z) END | Nested ternary expression | Returns resultX when the first
conditionX is met. When no condition is met, returns result_z if it is provided
and returns NULL otherwise. |
-| COALESCE(value1 [, value2]*) | coalesce(Object... objects) | Returns the
first argument that is not NULL.If all arguments are NULL, it returns NULL as
well. The return type is the least restrictive, common type of all of its
arguments. The return type is nullable if all arguments are nullable as well. |
-| IF(condition, true_value, false_value) | condition ? true_value :
false_value | Returns the true_value if condition is met, otherwise
false_value. E.g., IF(5 > 3, 5, 3) returns 5. |
+| Function
| Janino Code
| Description
|
+|-----------------------------------------------------------------------------------------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| CASE value WHEN value1_1 [, value1_2]* THEN RESULT1 (WHEN value2_1 [,
value2_2 ]* THEN result_2)* (ELSE result_z) END | Nested ternary expression
| Returns resultX when the first time value is contained in (valueX_1,
valueX_2, …). When no value matches, returns result_z if it is provided and
returns NULL otherwise.
|
+| CASE WHEN condition1 THEN result1 (WHEN condition2 THEN result2)* (ELSE
result_z) END | Nested ternary expression
| Returns resultX when the first conditionX is met. When no condition is
met, returns result_z if it is provided and returns NULL otherwise.
|
+| COALESCE(value1 [, value2]*)
| coalesce(Object... objects)
| Returns the first argument that is not NULL.If all arguments are NULL, it
returns NULL as well. The return type is the least restrictive, common type of
all of its arguments. The return type is nullable if all arguments are nullable
as well. |
+| IF(condition, true_value, false_value)
| condition ? true_value : false_value
| Returns the true_value if condition is met, otherwise false_value. E.g., IF(5
> 3, 5, 3) returns 5.
|
## Casting Functions
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index c81b1ba46..e4da2fca3 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -51,7 +51,6 @@ import
org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions;
import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper;
import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions;
import
org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException;
-import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -2737,14 +2736,9 @@ class FlinkPipelineTransformITCase {
.cause()
.isExactlyInstanceOf(FlinkRuntimeException.class)
.hasMessage(
- "Failed to compile expression
TransformExpressionKey{originalExpression='id1 > 0', expression='"
- + JaninoCompiler.LOAD_MODULES_EXPRESSION
- + "greaterThan($0, 0)',
argumentNames=[__time_zone__, __epoch_time__], argumentClasses=[class
java.lang.String, class java.lang.Long], returnClass=class java.lang.Boolean,
columnNameMap={id1=$0}}")
+ "Failed to compile expression
TransformExpressionKey{originalExpression='id1 > 0',
compiledExpression='greaterThan($0, 0)', argumentNames=[__time_zone__,
__epoch_time__], argumentClasses=[class java.lang.String, class
java.lang.Long], returnClass=class java.lang.Boolean, columnNameMap={id1=$0}}")
.cause()
- .hasMessageContaining(
- "Compiled expression: "
- + JaninoCompiler.LOAD_MODULES_EXPRESSION
- + "greaterThan($0, 0)")
+ .hasMessageContaining("Compiled expression: greaterThan($0,
0)")
.hasMessageContaining("Column name map: {$0 -> id1}")
.rootCause()
.isExactlyInstanceOf(CompileException.class)
@@ -2812,7 +2806,6 @@ class FlinkPipelineTransformITCase {
+ "\tOriginal expression: name + 1 > 0\n"
+ "\tCompiled expression: greaterThan($0 + 1,
0)\n"
+ "\tColumn name map: {$0 -> name}")
- .hasMessageContaining("Column name map: {$0 -> name}")
.rootCause()
.isExactlyInstanceOf(RuntimeException.class)
.hasMessage(
diff --git a/flink-cdc-composer/src/test/resources/specs/comparison.yaml
b/flink-cdc-composer/src/test/resources/specs/comparison.yaml
index 99c6d6d2d..c022f0780 100644
--- a/flink-cdc-composer/src/test/resources/specs/comparison.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/comparison.yaml
@@ -166,7 +166,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, true,
false, true, false], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, true, false, true,
false], after=[], op=DELETE, meta=()}
- do: Between Op
- ignore: FLINK-38906
projection: |-
id_
tinyint_ BETWEEN CAST(1 AS TINYINT) AND CAST(3 AS TINYINT) AS comp_1
@@ -189,7 +188,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, false, false,
false, false, false, false, false, false, false, false, false], op=INSERT,
meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, false, false, false,
false, false, false, false, false, false, false, false], after=[], op=DELETE,
meta=()}
- do: Not Between Op
- ignore: FLINK-38906
projection: |-
id_
tinyint_ NOT BETWEEN CAST(1 AS TINYINT) AND CAST(3 AS TINYINT) AS comp_1
@@ -238,7 +236,6 @@
projection: id_, char_ NOT LIKE 'A.*' AS comp_1, varchar_ NOT LIKE '.*rro'
AS comp_2, string_ NOT LIKE 'From [A-Z] to [A-Z] is Lie' AS comp_3
primary-key: id_
- do: In Op
- ignore: FLINK-38906
projection: |-
id_
tinyint_ IN (CAST(2 AS TINYINT)) AS comp_1
@@ -261,7 +258,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, false, false,
false, false, false, false, false, false, false, false, false], op=INSERT,
meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, false, false, false,
false, false, false, false, false, false, false, false], after=[], op=DELETE,
meta=()}
- do: Not In Op
- ignore: FLINK-38906
projection: |-
id_
tinyint_ NOT IN (CAST(2 AS TINYINT)) AS comp_1
diff --git a/flink-cdc-composer/src/test/resources/specs/decimal.yaml
b/flink-cdc-composer/src/test/resources/specs/decimal.yaml
index e12ddb025..f1418dd32 100644
--- a/flink-cdc-composer/src/test/resources/specs/decimal.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/decimal.yaml
@@ -14,7 +14,6 @@
################################################################################
- do: Add Op
- ignore: FLINK-38906
projection: |-
id_
decimal_10_0_ + CAST(1 AS DECIMAL(1, 0)) AS comp_1
@@ -27,7 +26,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567891, null],
after=[-1, -9876543209, null], op=UPDATE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543209, null],
after=[], op=DELETE, meta=()}
- do: Subtract Op
- ignore: FLINK-38906
projection: |-
id_
decimal_10_0_ - CAST(1 AS DECIMAL(1, 0)) AS comp_1
@@ -40,7 +38,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567889, null],
after=[-1, -9876543211, null], op=UPDATE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543211, null],
after=[], op=DELETE, meta=()}
- do: Multiply Op
- ignore: FLINK-38906
projection: |-
id_
decimal_10_0_ * CAST(2 AS DECIMAL(1, 0)) AS comp_1
@@ -53,7 +50,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[1, 2469135780, null],
after=[-1, -19753086420, null], op=UPDATE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, -19753086420, null],
after=[], op=DELETE, meta=()}
- do: Divide Op
- ignore: FLINK-38906
projection: |-
id_
decimal_10_0_ / CAST(2 AS DECIMAL(1, 0)) AS comp_1
@@ -66,7 +62,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[1, 617.283945,
61728394506172839.45], after=[-1, -4938.271605, null], op=UPDATE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, -4938.271605, null],
after=[], op=DELETE, meta=()}
- do: Abs Op
- ignore: FLINK-38906
projection: |-
id_
ABS(decimal_10_0_) AS comp_1
@@ -80,7 +75,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
- do: Ceil Op
- ignore: FLINK-38906
projection: |-
id_
CEIL(decimal_10_0_) AS comp_1
@@ -94,7 +88,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
- do: Floor Op
- ignore: FLINK-38906
projection: |-
id_
FLOOR(decimal_10_0_) AS comp_1
@@ -108,7 +101,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
- do: Round Op
- ignore: FLINK-38906
projection: |-
id_
ROUND(decimal_10_0_, 1) AS comp_1
diff --git a/flink-cdc-composer/src/test/resources/specs/meta.yaml
b/flink-cdc-composer/src/test/resources/specs/meta.yaml
index e8c893bf5..df6c2f857 100644
--- a/flink-cdc-composer/src/test/resources/specs/meta.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/meta.yaml
@@ -47,15 +47,3 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-D, -1, false, -2,
-3, -4, -5, -7.7, -88.88, -9876543210, -987654321098765432.10, 爱丽丝, 疯帽子,
天地玄黄宇宙洪荒, 5LiA5LqM5LiJ5Zub5LqU, 5YWt5LiD5YWr5Lmd5Y2B,
5ZC+6Lyp44Gv54yr44Gn44GC44KL, 1970-01-09T08:57:36.789723456,
1970-01-10T15:49:27.891834561, 1970-01-11T22:41:18.912945612,
1970-01-09T08:57:36.789723456+08:00, 1970-01-10T15:49:27.891834561+01:00,
1970-01-11T22:41:18.912945612-04:00, 1970-01-09T08:57:36.789723456,
1970-01-10T15:49:27.89 [...]
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[+I, 0, null, null,
null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-D, 0, null, null,
null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null], op=INSERT, meta=()}
-- do: Downcase Post Converter
- ignore: FLINK-38887
- projection: id_, 'UPCASE' AS UPCASE, 'downcase' AS downcase, 'MiXeD cAsE' AS
MiXeD_cAsE
- primary-key: id_
- converters: FIELD_NAME_LOWER_CASE
- expect: |-
- CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`upcase` STRING,`downcase` STRING,`mixed_case` STRING},
primaryKeys=id_, options=()}
- DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, UPCASE,
downcase, MiXeD cAsE], op=INSERT, meta=()}
- DataChangeEvent{tableId=foo.bar.baz, before=[1, UPCASE, downcase, MiXeD
cAsE], after=[-1, UPCASE, downcase, MiXeD cAsE], op=UPDATE, meta=()}
- DataChangeEvent{tableId=foo.bar.baz, before=[-1, UPCASE, downcase, MiXeD
cAsE], after=[], op=DELETE, meta=()}
- DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, UPCASE,
downcase, MiXeD cAsE], op=INSERT, meta=()}
- DataChangeEvent{tableId=foo.bar.baz, before=[0, UPCASE, downcase, MiXeD
cAsE], after=[], op=DELETE, meta=()}
diff --git a/flink-cdc-composer/src/test/resources/specs/temporal.yaml
b/flink-cdc-composer/src/test/resources/specs/temporal.yaml
index 3e485e659..3e6125296 100644
--- a/flink-cdc-composer/src/test/resources/specs/temporal.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/temporal.yaml
@@ -78,7 +78,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null,
null, null, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null, null,
null, null], after=[], op=DELETE, meta=()}
- do: To Date Function
- ignore: FLINK-38906
projection: |-
id_
TO_DATE('2025-01-05') AS comp_1
@@ -117,7 +116,6 @@
TO_TIMESTAMP('2024 !! 02 !! 14 11 !! 45 !! 49', 'yyyy//MM//dd') AS comp
expect-error: 'Unparseable date: "2024 !! 02 !! 14 11 !! 45 !! 49"'
- do: Format DateType and TimeType
- ignore: FLINK-38906
projection: |-
id_, date_, time_0_, time_6_
DATE_FORMAT(date_, 'yyyy->MM->dd') AS date_fmt_1
@@ -134,7 +132,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null,
null, null, null, null, null, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null, null,
null, null, null, null, null], after=[], op=DELETE, meta=()}
- do: From UnixTime Function
- ignore: FLINK-38906
projection: |-
FROM_UNIXTIME(bigint_) AS col_1
FROM_UNIXTIME(bigint_, 'yyyy/MM/dd HH;mm;ss') AS col_2
@@ -146,7 +143,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[null, null], after=[],
op=DELETE, meta=()}
- do: From UnixTime Function with Implicit Conversion
- ignore: FLINK-38906
projection: |-
FROM_UNIXTIME(int_) AS col_1
FROM_UNIXTIME(int_, 'yyyy/MM/dd HH;mm;ss') AS col_2
@@ -158,7 +154,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[null, null], after=[],
op=DELETE, meta=()}
- do: To Timestamp LTZ Function
- ignore: FLINK-38906
projection: |-
TO_TIMESTAMP_LTZ(bigint_) AS col_1
TO_TIMESTAMP_LTZ(bigint_, 0) AS col_2
@@ -174,7 +169,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null,
1999-12-31T23:50:23.456, 2000-03-29T19:21:48.321, 2001-11-23T02:48:25.999],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null,
1999-12-31T23:50:23.456, 2000-03-29T19:21:48.321, 2001-11-23T02:48:25.999],
after=[], op=DELETE, meta=()}
- do: Formatting TIMESTAMP(0) with Timezone
- ignore: FLINK-38906
projection: |-
DATE_FORMAT_TZ(timestamp_0_, 'Asia/Shanghai') AS col_1
DATE_FORMAT_TZ(timestamp_0_, 'America/Los_Angeles') AS col_2
@@ -190,7 +184,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null,
null, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null,
null], after=[], op=DELETE, meta=()}
- do: Formatting TIMESTAMP(6) with Timezone
- ignore: FLINK-38906
projection: |-
DATE_FORMAT_TZ(timestamp_6_, 'Asia/Shanghai') AS col_1
DATE_FORMAT_TZ(timestamp_6_, 'America/Los_Angeles') AS col_2
@@ -206,7 +199,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null,
null, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null,
null], after=[], op=DELETE, meta=()}
- do: Formatting TIMESTAMP(9) with Timezone
- ignore: FLINK-38906
projection: |-
DATE_FORMAT_TZ(timestamp_9_, 'Asia/Shanghai') AS col_1
DATE_FORMAT_TZ(timestamp_9_, 'America/Los_Angeles') AS col_2
@@ -222,7 +214,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null,
null, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null,
null], after=[], op=DELETE, meta=()}
- do: Formatting TIMESTAMP_LTZ(0) with Timezone
- ignore: FLINK-38906
projection: |-
DATE_FORMAT_TZ(timestamp_ltz_0_, 'Asia/Shanghai') AS col_1
DATE_FORMAT_TZ(timestamp_ltz_0_, 'America/Los_Angeles') AS col_2
@@ -238,7 +229,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null,
null, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null,
null], after=[], op=DELETE, meta=()}
- do: Formatting TIMESTAMP_LTZ(6) with Timezone
- ignore: FLINK-38906
projection: |-
DATE_FORMAT_TZ(timestamp_ltz_6_, 'Asia/Shanghai') AS col_1
DATE_FORMAT_TZ(timestamp_ltz_6_, 'America/Los_Angeles') AS col_2
@@ -254,7 +244,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null,
null, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null,
null], after=[], op=DELETE, meta=()}
- do: Formatting TIMESTAMP_LTZ(9) with Timezone
- ignore: FLINK-38906
projection: |-
DATE_FORMAT_TZ(timestamp_ltz_9_, 'Asia/Shanghai') AS col_1
DATE_FORMAT_TZ(timestamp_ltz_9_, 'America/Los_Angeles') AS col_2
@@ -270,7 +259,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null,
null, null, null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null,
null], after=[], op=DELETE, meta=()}
- do: DATE_ADD Function
- ignore: FLINK-38906
projection: |-
DATE_ADD(timestamp_0_, 17) AS col_1
DATE_ADD(timestamp_6_, 17) AS col_2
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/ComparisonFunctions.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/ComparisonFunctions.java
index be2df0e1f..41605ca8b 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/ComparisonFunctions.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/ComparisonFunctions.java
@@ -78,6 +78,13 @@ public class ComparisonFunctions {
return value.compareTo(minValue) >= 0 && value.compareTo(maxValue) <=
0;
}
+ public static boolean betweenAsymmetric(Byte value, byte minValue, byte
maxValue) {
+ if (value == null) {
+ return false;
+ }
+ return value >= minValue && value <= maxValue;
+ }
+
public static boolean betweenAsymmetric(Short value, short minValue, short
maxValue) {
if (value == null) {
return false;
@@ -125,6 +132,10 @@ public class ComparisonFunctions {
return !betweenAsymmetric(value, minValue, maxValue);
}
+ public static boolean notBetweenAsymmetric(Byte value, byte minValue, byte
maxValue) {
+ return !betweenAsymmetric(value, minValue, maxValue);
+ }
+
public static boolean notBetweenAsymmetric(Short value, short minValue,
short maxValue) {
return !betweenAsymmetric(value, minValue, maxValue);
}
@@ -151,37 +162,45 @@ public class ComparisonFunctions {
}
public static boolean in(String value, String... str) {
- return Arrays.stream(str).anyMatch(item -> value.equals(item));
+ return Arrays.asList(str).contains(value);
+ }
+
+ public static boolean in(Byte value, Byte... values) {
+ return Arrays.asList(values).contains(value);
}
public static boolean in(Short value, Short... values) {
- return Arrays.stream(values).anyMatch(item -> value.equals(item));
+ return Arrays.asList(values).contains(value);
}
public static boolean in(Integer value, Integer... values) {
- return Arrays.stream(values).anyMatch(item -> value.equals(item));
+ return Arrays.asList(values).contains(value);
}
public static boolean in(Long value, Long... values) {
- return Arrays.stream(values).anyMatch(item -> value.equals(item));
+ return Arrays.asList(values).contains(value);
}
public static boolean in(Float value, Float... values) {
- return Arrays.stream(values).anyMatch(item -> value.equals(item));
+ return Arrays.asList(values).contains(value);
}
public static boolean in(Double value, Double... values) {
- return Arrays.stream(values).anyMatch(item -> value.equals(item));
+ return Arrays.asList(values).contains(value);
}
public static boolean in(BigDecimal value, BigDecimal... values) {
- return Arrays.stream(values).anyMatch(item -> value.equals(item));
+ return Arrays.asList(values).contains(value);
}
public static boolean notIn(String value, String... values) {
return !in(value, values);
}
+ public static boolean notIn(Byte value, Byte... values) {
+ return !in(value, values);
+ }
+
public static boolean notIn(Short value, Short... values) {
return !in(value, values);
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index 4adc1f2a9..ae0b48d57 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -487,7 +487,7 @@ public class PostTransformOperator extends
AbstractStreamOperator<Event>
new PostTransformer(
selectors,
TransformProjection.of(projection).orElse(null),
- TransformFilter.of(filterExpression,
udfDescriptors).orElse(null),
+ TransformFilter.of(filterExpression).orElse(null),
PostTransformConverters.of(rule.getPostTransformConverter())
.orElse(null),
rule.getSupportedMetadataColumns());
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index 61af9e370..ff6cc512f 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -116,7 +116,7 @@ public class PreTransformOperator extends
AbstractStreamOperator<Event>
new PreTransformer(
selectors,
TransformProjection.of(projection).orElse(null),
- TransformFilter.of(filter,
udfDescriptors).orElse(null)));
+ TransformFilter.of(filter).orElse(null)));
schemaMetadataTransformers.add(
new Tuple2<>(
selectors,
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
index f2cfc4256..db5c37ca2 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
@@ -131,7 +131,6 @@ public class ProjectionColumnProcessor {
List<Class<?>> paramTypes = new ArrayList<>();
List<Column> columns =
tableInfo.getPreTransformedSchema().getColumns();
- String scriptExpression = projectionColumn.getScriptExpression();
Map<String, String> columnNameMap =
projectionColumn.getColumnNameMap();
LinkedHashSet<String> originalColumnNames =
new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
@@ -171,7 +170,7 @@ public class ProjectionColumnProcessor {
return TransformExpressionKey.of(
projectionColumn.getExpression(),
- JaninoCompiler.loadSystemFunction(scriptExpression),
+ projectionColumn.getScriptExpression(),
argumentNames,
paramTypes,
JavaClassConverter.toJavaClass(projectionColumn.getDataType()),
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
index bc496faf9..1085a7df8 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
@@ -26,6 +26,8 @@ import
org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.codehaus.commons.compiler.CompileException;
import org.codehaus.janino.ExpressionEvaluator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
@@ -36,6 +38,8 @@ import java.util.List;
*/
public class TransformExpressionCompiler {
+ private static final Logger LOG =
LoggerFactory.getLogger(TransformExpressionCompiler.class);
+
static final Cache<TransformExpressionKey, ExpressionEvaluator>
COMPILED_EXPRESSION_CACHE =
CacheBuilder.newBuilder().softValues().build();
@@ -71,9 +75,17 @@ public class TransformExpressionCompiler {
// Result type
expressionEvaluator.setExpressionType(key.getReturnClass());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Going to evaluate expression: {}",
key.getFullExpression());
+ LOG.debug(" - Argument names: {}", argumentNames);
+ LOG.debug(" - Argument types: {}",
argumentClasses);
+ LOG.debug(" - Returns: {}", key.getReturnClass());
+ }
+
try {
// Compile
- expressionEvaluator.cook(key.getExpression());
+ expressionEvaluator.cook(key.getFullExpression());
} catch (CompileException e) {
throw new InvalidProgramException(
String.format(
@@ -82,7 +94,7 @@ public class TransformExpressionCompiler {
+ "\tCompiled expression:
%s\n"
+ "\tColumn name map:
{%s}",
key.getOriginalExpression(),
- key.getExpression(),
+ key.getCompiledExpression(),
TransformException.prettyPrintColumnNameMap(
key.getColumnNameMap())),
e);
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
index 75cb13b66..5a15662e5 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
@@ -17,6 +17,8 @@
package org.apache.flink.cdc.runtime.operators.transform;
+import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
+
import javax.annotation.Nullable;
import java.io.Serializable;
@@ -43,7 +45,7 @@ import java.util.Objects;
public class TransformExpressionKey implements Serializable {
private static final long serialVersionUID = 1L;
@Nullable private final String originalExpression;
- private final String expression;
+ private final String compiledExpression;
private final List<String> argumentNames;
private final List<Class<?>> argumentClasses;
private final Class<?> returnClass;
@@ -51,13 +53,13 @@ public class TransformExpressionKey implements Serializable
{
private TransformExpressionKey(
@Nullable String originalExpression,
- String expression,
+ String compiledExpression,
List<String> argumentNames,
List<Class<?>> argumentClasses,
Class<?> returnClass,
Map<String, String> columnNameMap) {
this.originalExpression = originalExpression;
- this.expression = expression;
+ this.compiledExpression = compiledExpression;
this.argumentNames = argumentNames;
this.argumentClasses = argumentClasses;
this.returnClass = returnClass;
@@ -69,8 +71,12 @@ public class TransformExpressionKey implements Serializable {
return originalExpression;
}
- public String getExpression() {
- return expression;
+ public String getCompiledExpression() {
+ return compiledExpression;
+ }
+
+ public String getFullExpression() {
+ return JaninoCompiler.loadSystemFunction(compiledExpression);
}
public List<String> getArgumentNames() {
@@ -91,14 +97,14 @@ public class TransformExpressionKey implements Serializable
{
public static TransformExpressionKey of(
@Nullable String originalExpression,
- String expression,
+ String compiledExpression,
List<String> argumentNames,
List<Class<?>> argumentClasses,
Class<?> returnClass,
Map<String, String> columnNameMap) {
return new TransformExpressionKey(
originalExpression,
- expression,
+ compiledExpression,
argumentNames,
argumentClasses,
returnClass,
@@ -115,7 +121,7 @@ public class TransformExpressionKey implements Serializable
{
}
TransformExpressionKey that = (TransformExpressionKey) o;
return Objects.equals(originalExpression, that.originalExpression)
- && expression.equals(that.expression)
+ && compiledExpression.equals(that.compiledExpression)
&& argumentNames.equals(that.argumentNames)
&& argumentClasses.equals(that.argumentClasses)
&& returnClass.equals(that.returnClass)
@@ -126,7 +132,7 @@ public class TransformExpressionKey implements Serializable
{
public int hashCode() {
return Objects.hash(
originalExpression,
- expression,
+ compiledExpression,
argumentNames,
argumentClasses,
returnClass,
@@ -139,8 +145,8 @@ public class TransformExpressionKey implements Serializable
{
+ "originalExpression='"
+ originalExpression
+ '\''
- + ", expression='"
- + expression
+ + ", compiledExpression='"
+ + compiledExpression
+ '\''
+ ", argumentNames="
+ argumentNames
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
index 3feaa0822..a9ea00dd0 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
@@ -41,17 +41,12 @@ import java.util.Optional;
public class TransformFilter implements Serializable {
private static final long serialVersionUID = 1L;
private final String expression;
- private final String scriptExpression;
private final List<String> columnNames;
private final Map<String, String> columnNameMap;
public TransformFilter(
- String expression,
- String scriptExpression,
- List<String> columnNames,
- Map<String, String> columnNameMap) {
+ String expression, List<String> columnNames, Map<String, String>
columnNameMap) {
this.expression = expression;
- this.scriptExpression = scriptExpression;
this.columnNames = columnNames;
this.columnNameMap = columnNameMap;
}
@@ -60,10 +55,6 @@ public class TransformFilter implements Serializable {
return expression;
}
- public String getScriptExpression() {
- return scriptExpression;
- }
-
public List<String> getColumnNames() {
return columnNames;
}
@@ -76,19 +67,13 @@ public class TransformFilter implements Serializable {
return TransformException.prettyPrintColumnNameMap(getColumnNameMap());
}
- public static Optional<TransformFilter> of(
- String filterExpression, List<UserDefinedFunctionDescriptor>
udfDescriptors) {
+ public static Optional<TransformFilter> of(String filterExpression) {
if (StringUtils.isNullOrWhitespaceOnly(filterExpression)) {
return Optional.empty();
}
List<String> columnNames =
TransformParser.parseFilterColumnNameList(filterExpression);
Map<String, String> columnNameMap =
TransformParser.generateColumnNameMap(columnNames);
- String scriptExpression =
- TransformParser.translateFilterExpressionToJaninoExpression(
- filterExpression, udfDescriptors, columnNameMap);
- return Optional.of(
- new TransformFilter(
- filterExpression, scriptExpression, columnNames,
columnNameMap));
+ return Optional.of(new TransformFilter(filterExpression, columnNames,
columnNameMap));
}
public boolean isValid() {
@@ -101,9 +86,6 @@ public class TransformFilter implements Serializable {
+ "expression='"
+ expression
+ '\''
- + ", scriptExpression='"
- + scriptExpression
- + '\''
+ ", columnNames="
+ columnNames
+ ", columnNameMap="
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
index 299266d1b..77dc5cfe6 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
@@ -22,6 +22,7 @@ import
org.apache.flink.cdc.common.converter.JavaClassConverter;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
+import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.codehaus.janino.ExpressionEvaluator;
@@ -69,7 +70,13 @@ public class TransformFilterProcessor {
this.transformExpressionKey = null;
this.expressionEvaluator = null;
} else {
- this.transformExpressionKey = generateTransformExpressionKey();
+ this.transformExpressionKey =
+ generateTransformExpressionKey(
+ tableInfo.getPreTransformedSchema().getColumns(),
+ udfDescriptors,
+ supportedMetadataColumns
+ .values()
+ .toArray(new SupportedMetadataColumn[0]));
this.expressionEvaluator =
TransformExpressionCompiler.compileExpression(
transformExpressionKey, udfDescriptors);
@@ -117,8 +124,12 @@ public class TransformFilterProcessor {
+ "\tCompiled expression: %s\n"
+ "\tColumn name map: {%s}",
tableInfo.getName(),
- transformFilter.getExpression(),
- transformFilter.getScriptExpression(),
+ transformExpressionKey != null
+ ?
transformExpressionKey.getOriginalExpression()
+ : "<no op>",
+ transformExpressionKey != null
+ ?
transformExpressionKey.getCompiledExpression()
+ : "<no op>",
transformFilter.getColumnNameMapAsString()),
e);
}
@@ -200,7 +211,10 @@ public class TransformFilterProcessor {
return params.toArray();
}
- private TransformExpressionKey generateTransformExpressionKey() {
+ private TransformExpressionKey generateTransformExpressionKey(
+ List<Column> columns,
+ List<UserDefinedFunctionDescriptor> udfDescriptors,
+ SupportedMetadataColumn[] supportedMetadataColumns) {
Tuple2<List<String>, List<Class<?>>> args = generateArguments(true);
args.f0.add(JaninoCompiler.DEFAULT_TIME_ZONE);
@@ -208,9 +222,17 @@ public class TransformFilterProcessor {
args.f0.add(JaninoCompiler.DEFAULT_EPOCH_TIME);
args.f1.add(Long.class);
+ String scriptExpression =
+ TransformParser.translateFilterExpressionToJaninoExpression(
+ transformFilter.getExpression(),
+ columns,
+ udfDescriptors,
+ supportedMetadataColumns,
+ transformFilter.getColumnNameMap());
+
return TransformExpressionKey.of(
transformFilter.getExpression(),
-
JaninoCompiler.loadSystemFunction(transformFilter.getScriptExpression()),
+ scriptExpression,
args.f0,
args.f1,
Boolean.class,
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index b7fcfa0ea..4345331ac 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -21,6 +21,10 @@ import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.converter.JavaClassConverter;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.utils.StringUtils;
import
org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
@@ -78,7 +82,8 @@ public class JaninoCompiler {
"TIMESTAMPADD",
"TIMESTAMPDIFF",
"TIMESTAMP_DIFF",
- "DATE_FORMAT");
+ "DATE_FORMAT",
+ "DATE_ADD");
public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
public static final String DEFAULT_TIME_ZONE = "__time_zone__";
@@ -121,54 +126,47 @@ public class JaninoCompiler {
}
}
- public static String translateSqlNodeToJaninoExpression(
- SqlNode transform,
- List<UserDefinedFunctionDescriptor> udfDescriptors,
- Map<String, String> columnNameMap) {
- Java.Rvalue rvalue =
- translateSqlNodeToJaninoRvalue(transform, udfDescriptors,
columnNameMap);
+ public static String translateSqlNodeToJaninoExpression(Context context,
SqlNode transform) {
+ Java.Rvalue rvalue = translateSqlNodeToJaninoRvalue(context,
transform);
if (rvalue != null) {
return rvalue.toString();
}
return "";
}
- public static Java.Rvalue translateSqlNodeToJaninoRvalue(
- SqlNode transform,
- List<UserDefinedFunctionDescriptor> udfDescriptors,
- Map<String, String> columnNameMap) {
+ public static Java.Rvalue translateSqlNodeToJaninoRvalue(Context context,
SqlNode transform) {
if (transform instanceof SqlIdentifier) {
- return translateSqlIdentifier((SqlIdentifier) transform,
columnNameMap);
+ return translateSqlIdentifier(context, (SqlIdentifier) transform);
} else if (transform instanceof SqlBasicCall) {
- return translateSqlBasicCall((SqlBasicCall) transform,
udfDescriptors, columnNameMap);
+ return translateSqlBasicCall(context, (SqlBasicCall) transform);
} else if (transform instanceof SqlCase) {
- return translateSqlCase((SqlCase) transform, udfDescriptors,
columnNameMap);
+ return translateSqlCase(context, (SqlCase) transform);
} else if (transform instanceof SqlLiteral) {
- return translateSqlSqlLiteral((SqlLiteral) transform);
+ return translateSqlSqlLiteral(context, (SqlLiteral) transform);
}
return null;
}
private static Java.Rvalue translateSqlIdentifier(
- SqlIdentifier sqlIdentifier, Map<String, String> columnNameMap) {
+ Context context, SqlIdentifier sqlIdentifier) {
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size()
- 1);
if
(TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName.toUpperCase())) {
- return generateTimezoneFreeTemporalFunctionOperation(columnName);
+ return generateTimezoneFreeTemporalFunctionOperation(context,
columnName);
} else if
(TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(columnName.toUpperCase())) {
- return
generateTimezoneRequiredTemporalFunctionOperation(columnName);
+ return generateTimezoneRequiredTemporalFunctionOperation(context,
columnName);
} else if
(TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName.toUpperCase()))
{
- return
generateTimezoneFreeTemporalConversionFunctionOperation(columnName);
+ return
generateTimezoneFreeTemporalConversionFunctionOperation(context, columnName);
} else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(
columnName.toUpperCase())) {
- return
generateTimezoneRequiredTemporalConversionFunctionOperation(columnName);
+ return
generateTimezoneRequiredTemporalConversionFunctionOperation(context,
columnName);
} else {
return new Java.AmbiguousName(
Location.NOWHERE,
- new String[] {columnNameMap.getOrDefault(columnName,
columnName)});
+ new String[]
{context.columnNameMap.getOrDefault(columnName, columnName)});
}
}
- private static Java.Rvalue translateSqlSqlLiteral(SqlLiteral sqlLiteral) {
+ private static Java.Rvalue translateSqlSqlLiteral(Context context,
SqlLiteral sqlLiteral) {
if (sqlLiteral.getValue() == null) {
return new Java.NullLiteral(Location.NOWHERE);
}
@@ -190,14 +188,11 @@ public class JaninoCompiler {
return new Java.AmbiguousName(Location.NOWHERE, new String[]
{value.toString()});
}
- private static Java.Rvalue translateSqlBasicCall(
- SqlBasicCall sqlBasicCall,
- List<UserDefinedFunctionDescriptor> udfDescriptors,
- Map<String, String> columnNameMap) {
+ private static Java.Rvalue translateSqlBasicCall(Context context,
SqlBasicCall sqlBasicCall) {
List<SqlNode> operandList = sqlBasicCall.getOperandList();
List<Java.Rvalue> atoms = new ArrayList<>();
for (SqlNode sqlNode : operandList) {
- translateSqlNodeToAtoms(sqlNode, atoms, udfDescriptors,
columnNameMap);
+ translateSqlNodeToAtoms(context, sqlNode, atoms);
}
if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(
sqlBasicCall.getOperator().getName().toUpperCase())) {
@@ -210,27 +205,22 @@ public class JaninoCompiler {
sqlBasicCall.getOperator().getName().toUpperCase())) {
atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[]
{DEFAULT_TIME_ZONE}));
}
- return sqlBasicCallToJaninoRvalue(
- sqlBasicCall, atoms.toArray(new Java.Rvalue[0]),
udfDescriptors);
+ return sqlBasicCallToJaninoRvalue(context, sqlBasicCall,
atoms.toArray(new Java.Rvalue[0]));
}
- private static Java.Rvalue translateSqlCase(
- SqlCase sqlCase,
- List<UserDefinedFunctionDescriptor> udfDescriptors,
- Map<String, String> columnNameMap) {
+ private static Java.Rvalue translateSqlCase(Context context, SqlCase
sqlCase) {
SqlNodeList whenOperands = sqlCase.getWhenOperands();
SqlNodeList thenOperands = sqlCase.getThenOperands();
SqlNode elseOperand = sqlCase.getElseOperand();
List<Java.Rvalue> whenAtoms = new ArrayList<>();
for (SqlNode sqlNode : whenOperands) {
- translateSqlNodeToAtoms(sqlNode, whenAtoms, udfDescriptors,
columnNameMap);
+ translateSqlNodeToAtoms(context, sqlNode, whenAtoms);
}
List<Java.Rvalue> thenAtoms = new ArrayList<>();
for (SqlNode sqlNode : thenOperands) {
- translateSqlNodeToAtoms(sqlNode, thenAtoms, udfDescriptors,
columnNameMap);
+ translateSqlNodeToAtoms(context, sqlNode, thenAtoms);
}
- Java.Rvalue elseAtoms =
- translateSqlNodeToJaninoRvalue(elseOperand, udfDescriptors,
columnNameMap);
+ Java.Rvalue elseAtoms = translateSqlNodeToJaninoRvalue(context,
elseOperand);
Java.Rvalue sqlCaseRvalueTemp = elseAtoms;
for (int i = whenAtoms.size() - 1; i >= 0; i--) {
sqlCaseRvalueTemp =
@@ -244,50 +234,46 @@ public class JaninoCompiler {
}
private static void translateSqlNodeToAtoms(
- SqlNode sqlNode,
- List<Java.Rvalue> atoms,
- List<UserDefinedFunctionDescriptor> udfDescriptors,
- Map<String, String> columnNameMap) {
+ Context context, SqlNode sqlNode, List<Java.Rvalue> atoms) {
if (sqlNode instanceof SqlIdentifier) {
- atoms.add(translateSqlIdentifier((SqlIdentifier) sqlNode,
columnNameMap));
+ atoms.add(translateSqlIdentifier(context, (SqlIdentifier)
sqlNode));
} else if (sqlNode instanceof SqlLiteral) {
- atoms.add(translateSqlSqlLiteral((SqlLiteral) sqlNode));
+ atoms.add(translateSqlSqlLiteral(context, (SqlLiteral) sqlNode));
} else if (sqlNode instanceof SqlBasicCall) {
- atoms.add(translateSqlBasicCall((SqlBasicCall) sqlNode,
udfDescriptors, columnNameMap));
+ atoms.add(translateSqlBasicCall(context, (SqlBasicCall) sqlNode));
} else if (sqlNode instanceof SqlNodeList) {
for (SqlNode node : (SqlNodeList) sqlNode) {
- translateSqlNodeToAtoms(node, atoms, udfDescriptors,
columnNameMap);
+ translateSqlNodeToAtoms(context, node, atoms);
}
} else if (sqlNode instanceof SqlCase) {
- atoms.add(translateSqlCase((SqlCase) sqlNode, udfDescriptors,
columnNameMap));
+ atoms.add(translateSqlCase(context, (SqlCase) sqlNode));
}
}
private static Java.Rvalue sqlBasicCallToJaninoRvalue(
- SqlBasicCall sqlBasicCall,
- Java.Rvalue[] atoms,
- List<UserDefinedFunctionDescriptor> udfDescriptors) {
+ Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
switch (sqlBasicCall.getKind()) {
case AND:
- return generateBinaryOperation(sqlBasicCall, atoms, "&&");
+ return generateBinaryOperation(context, sqlBasicCall, atoms,
"&&");
case OR:
- return generateBinaryOperation(sqlBasicCall, atoms, "||");
+ return generateBinaryOperation(context, sqlBasicCall, atoms,
"||");
case NOT:
- return generateUnaryOperation("!", atoms[0]);
+ return generateUnaryOperation(context, "!", atoms[0]);
case EQUALS:
- return generateEqualsOperation(sqlBasicCall, atoms);
+ return generateEqualsOperation(context, sqlBasicCall, atoms);
case NOT_EQUALS:
- return generateUnaryOperation("!",
generateEqualsOperation(sqlBasicCall, atoms));
+ return generateUnaryOperation(
+ context, "!", generateEqualsOperation(context,
sqlBasicCall, atoms));
case IS_NULL:
- return generateUnaryOperation("null == ", atoms[0]);
+ return generateUnaryOperation(context, "null == ", atoms[0]);
case IS_NOT_NULL:
- return generateUnaryOperation("null != ", atoms[0]);
+ return generateUnaryOperation(context, "null != ", atoms[0]);
case IS_FALSE:
case IS_NOT_TRUE:
- return generateUnaryOperation("false == ", atoms[0]);
+ return generateUnaryOperation(context, "false == ", atoms[0]);
case IS_TRUE:
case IS_NOT_FALSE:
- return generateUnaryOperation("true == ", atoms[0]);
+ return generateUnaryOperation(context, "true == ", atoms[0]);
case BETWEEN:
case IN:
case NOT_IN:
@@ -296,49 +282,69 @@ public class JaninoCompiler {
case FLOOR:
case TRIM:
case OTHER_FUNCTION:
- return generateOtherFunctionOperation(sqlBasicCall, atoms,
udfDescriptors);
+ return generateOtherFunctionOperation(context, sqlBasicCall,
atoms);
case PLUS:
- return generateBinaryOperation(sqlBasicCall, atoms, "+");
+ return generateBinaryOperation(context, sqlBasicCall, atoms,
"+");
case MINUS:
- return generateBinaryOperation(sqlBasicCall, atoms, "-");
+ return generateBinaryOperation(context, sqlBasicCall, atoms,
"-");
case TIMES:
- return generateBinaryOperation(sqlBasicCall, atoms, "*");
+ return generateBinaryOperation(context, sqlBasicCall, atoms,
"*");
case DIVIDE:
- return generateBinaryOperation(sqlBasicCall, atoms, "/");
+ return generateBinaryOperation(context, sqlBasicCall, atoms,
"/");
case MOD:
- return generateBinaryOperation(sqlBasicCall, atoms, "%");
+ return generateBinaryOperation(context, sqlBasicCall, atoms,
"%");
case LESS_THAN:
case GREATER_THAN:
case LESS_THAN_OR_EQUAL:
case GREATER_THAN_OR_EQUAL:
- return generateCompareOperation(sqlBasicCall, atoms);
+ return generateCompareOperation(context, sqlBasicCall, atoms);
case CAST:
- return generateCastOperation(sqlBasicCall, atoms);
+ return generateCastOperation(context, sqlBasicCall, atoms);
case TIMESTAMP_DIFF:
- return generateTimestampDiffOperation(sqlBasicCall, atoms);
+ return generateTimestampDiffOperation(context, sqlBasicCall,
atoms);
case TIMESTAMP_ADD:
- return generateTimestampAddOperation(sqlBasicCall, atoms);
+ return generateTimestampAddOperation(context, sqlBasicCall,
atoms);
case OTHER:
- return generateOtherOperation(sqlBasicCall, atoms);
+ return generateOtherOperation(context, sqlBasicCall, atoms);
default:
- throw new ParseException("Unrecognized expression: " +
sqlBasicCall.toString());
+ throw new ParseException("Unrecognized expression: " +
sqlBasicCall);
}
}
- private static Java.Rvalue generateUnaryOperation(String operator,
Java.Rvalue atom) {
+ private static Java.Rvalue generateUnaryOperation(
+ Context context, String operator, Java.Rvalue atom) {
return new Java.UnaryOperation(Location.NOWHERE, operator, atom);
}
+ private static final Map<String, String> decimalArithmeticHandlers =
+ Map.of(
+ "+", "plus",
+ "-", "minus",
+ "*", "times",
+ "/", "divides");
+
private static Java.Rvalue generateBinaryOperation(
- SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms, String operator) {
+ Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms,
String operator) {
if (atoms.length != 2) {
throw new ParseException("Unrecognized expression: " +
sqlBasicCall.toString());
}
+ if (decimalArithmeticHandlers.containsKey(operator)) {
+ String handler = decimalArithmeticHandlers.get(operator);
+ DataType resultType =
+ TransformParser.deduceSubExpressionType(
+ context.columns,
+ sqlBasicCall,
+ context.udfDescriptors,
+ context.supportedMetadataColumns);
+ if (resultType.is(DataTypeRoot.DECIMAL)) {
+ return new Java.MethodInvocation(Location.NOWHERE, null,
handler, atoms);
+ }
+ }
return new Java.BinaryOperation(Location.NOWHERE, atoms[0], operator,
atoms[1]);
}
private static Java.Rvalue generateEqualsOperation(
- SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+ Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
if (atoms.length != 2) {
throw new ParseException("Unrecognized expression: " +
sqlBasicCall.toString());
}
@@ -347,17 +353,17 @@ public class JaninoCompiler {
}
private static Java.Rvalue generateCastOperation(
- SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+ Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
if (atoms.length != 1) {
throw new ParseException("Unrecognized expression: " +
sqlBasicCall.toString());
}
List<SqlNode> operandList = sqlBasicCall.getOperandList();
SqlDataTypeSpec sqlDataTypeSpec = (SqlDataTypeSpec) operandList.get(1);
- return generateTypeConvertMethod(sqlDataTypeSpec, atoms);
+ return generateTypeConvertMethod(context, sqlDataTypeSpec, atoms);
}
private static Java.Rvalue generateCompareOperation(
- SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+ Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
if (atoms.length != 2) {
throw new ParseException("Unrecognized expression: " +
sqlBasicCall.toString());
}
@@ -385,7 +391,7 @@ public class JaninoCompiler {
}
private static Java.Rvalue generateTimestampDiffOperation(
- SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+ Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
if (atoms.length != 4) {
throw new ParseException("Unrecognized expression: " +
sqlBasicCall.toString());
}
@@ -417,7 +423,7 @@ public class JaninoCompiler {
}
private static Java.Rvalue generateTimestampAddOperation(
- SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+ Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
if (atoms.length != 4) {
throw new ParseException("Unrecognized expression: " +
sqlBasicCall.toString());
}
@@ -448,13 +454,13 @@ public class JaninoCompiler {
timestampDiffFunctionParam.toArray(new Java.Rvalue[0]));
}
- private static Java.Rvalue generateCharLengthOperation(Java.Rvalue[]
atoms) {
+ private static Java.Rvalue generateCharLengthOperation(Context context,
Java.Rvalue[] atoms) {
return new Java.MethodInvocation(
Location.NOWHERE, null,
StringUtils.convertToCamelCase("CHAR_LENGTH"), atoms);
}
private static Java.Rvalue generateOtherOperation(
- SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+ Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
if (sqlBasicCall.getOperator().getName().equals("||")) {
return new Java.MethodInvocation(
Location.NOWHERE, null,
StringUtils.convertToCamelCase("CONCAT"), atoms);
@@ -463,9 +469,7 @@ public class JaninoCompiler {
}
private static Java.Rvalue generateOtherFunctionOperation(
- SqlBasicCall sqlBasicCall,
- Java.Rvalue[] atoms,
- List<UserDefinedFunctionDescriptor> udfDescriptors) {
+ Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
String operationName =
sqlBasicCall.getOperator().getName().toUpperCase();
if (operationName.equals("IF")) {
if (atoms.length == 3) {
@@ -476,7 +480,7 @@ public class JaninoCompiler {
}
} else {
Optional<UserDefinedFunctionDescriptor> udfFunctionOptional =
- udfDescriptors.stream()
+ context.udfDescriptors.stream()
.filter(e ->
e.getName().equalsIgnoreCase(operationName))
.findFirst();
return udfFunctionOptional
@@ -498,7 +502,8 @@ public class JaninoCompiler {
}
}
- private static Java.Rvalue
generateTimezoneFreeTemporalFunctionOperation(String operationName) {
+ private static Java.Rvalue generateTimezoneFreeTemporalFunctionOperation(
+ Context context, String operationName) {
return new Java.MethodInvocation(
Location.NOWHERE,
null,
@@ -509,7 +514,7 @@ public class JaninoCompiler {
}
private static Java.Rvalue
generateTimezoneRequiredTemporalFunctionOperation(
- String operationName) {
+ Context context, String operationName) {
List<Java.Rvalue> timestampFunctionParam = new ArrayList<>();
timestampFunctionParam.add(
new Java.AmbiguousName(Location.NOWHERE, new String[]
{DEFAULT_EPOCH_TIME}));
@@ -523,7 +528,7 @@ public class JaninoCompiler {
}
private static Java.Rvalue
generateTimezoneFreeTemporalConversionFunctionOperation(
- String operationName) {
+ Context context, String operationName) {
return new Java.MethodInvocation(
Location.NOWHERE,
null,
@@ -532,7 +537,7 @@ public class JaninoCompiler {
}
private static Java.Rvalue
generateTimezoneRequiredTemporalConversionFunctionOperation(
- String operationName) {
+ Context context, String operationName) {
return new Java.MethodInvocation(
Location.NOWHERE,
null,
@@ -543,7 +548,7 @@ public class JaninoCompiler {
}
private static Java.Rvalue generateTypeConvertMethod(
- SqlDataTypeSpec sqlDataTypeSpec, Java.Rvalue[] atoms) {
+ Context context, SqlDataTypeSpec sqlDataTypeSpec, Java.Rvalue[]
atoms) {
switch (sqlDataTypeSpec.getTypeName().getSimple().toUpperCase()) {
case "BOOLEAN":
return new Java.MethodInvocation(Location.NOWHERE, null,
"castToBoolean", atoms);
@@ -614,4 +619,39 @@ public class JaninoCompiler {
return String.format("__instanceOf%s.eval",
udfFunction.getClassName());
}
}
+
+ /** Contextual information for {@link JaninoCompiler}. */
+ public static class Context {
+
+ // Upstream physical columns
+ public final List<Column> columns;
+
+ // Mangled column name map to $1, $2...
+ public final Map<String, String> columnNameMap;
+
+ // User defined function signatures
+ public final List<UserDefinedFunctionDescriptor> udfDescriptors;
+
+ // Readable metadata columns
+ public final SupportedMetadataColumn[] supportedMetadataColumns;
+
+ private Context(
+ List<Column> columns,
+ Map<String, String> columnNameMap,
+ List<UserDefinedFunctionDescriptor> udfDescriptors,
+ SupportedMetadataColumn[] supportedMetadataColumns) {
+ this.columns = columns;
+ this.columnNameMap = columnNameMap;
+ this.udfDescriptors = udfDescriptors;
+ this.supportedMetadataColumns = supportedMetadataColumns;
+ }
+
+ public static Context of(
+ List<Column> columns,
+ Map<String, String> columnNameMap,
+ List<UserDefinedFunctionDescriptor> udfDescriptors,
+ SupportedMetadataColumn[] supportedMetadataColumns) {
+ return new Context(columns, columnNameMap, udfDescriptors,
supportedMetadataColumns);
+ }
+ }
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
index 3369c482d..5ee715301 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
@@ -348,7 +348,12 @@ public class TransformParser {
relDataType),
exprNode.toString(),
JaninoCompiler.translateSqlNodeToJaninoExpression(
- exprNode, udfDescriptors,
columnNameMap),
+ JaninoCompiler.Context.of(
+ columns,
+ columnNameMap,
+ udfDescriptors,
+ supportedMetadataColumns),
+ exprNode),
originalColumnNames,
columnNameMap);
}
@@ -423,7 +428,9 @@ public class TransformParser {
public static String translateFilterExpressionToJaninoExpression(
String filterExpression,
+ List<Column> columns,
List<UserDefinedFunctionDescriptor> udfDescriptors,
+ SupportedMetadataColumn[] supportedMetadataColumns,
Map<String, String> columnNameMap) {
if (isNullOrWhitespaceOnly(filterExpression)) {
return "";
@@ -434,7 +441,9 @@ public class TransformParser {
}
SqlNode where = sqlSelect.getWhere();
return JaninoCompiler.translateSqlNodeToJaninoExpression(
- where, udfDescriptors, columnNameMap);
+ JaninoCompiler.Context.of(
+ columns, columnNameMap, udfDescriptors,
supportedMetadataColumns),
+ where);
}
public static List<String> parseComputedColumnNames(
@@ -597,4 +606,37 @@ public class TransformParser {
}
return columnNameMap;
}
+
+ public static DataType deduceSubExpressionType(
+ List<Column> columns,
+ SqlNode subExpression,
+ List<UserDefinedFunctionDescriptor> udfDescriptors,
+ SupportedMetadataColumn[] supportedMetadataColumns) {
+ SqlSelect sqlSelect =
+ new SqlSelect(
+ SqlParserPos.QUOTED_ZERO,
+ SqlNodeList.EMPTY,
+ SqlNodeList.of(subExpression),
+ new SqlIdentifier(DEFAULT_TABLE,
SqlParserPos.QUOTED_ZERO),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+ RelNode relNode = sqlToRel(columns, sqlSelect, udfDescriptors,
supportedMetadataColumns);
+ RelDataType[] relDataTypes =
+ relNode.getRowType().getFieldList().stream()
+ .map(RelDataTypeField::getType)
+ .toArray(RelDataType[]::new);
+ Preconditions.checkArgument(
+ relDataTypes.length == 1,
+ "RelDataType %s should be unary from SqlNode %s",
+ relDataTypes,
+ sqlSelect);
+ RelDataType expressionType = relDataTypes[0];
+ return
CalciteDataTypeConverter.convertCalciteRelDataTypeToDataType(expressionType);
+ }
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index 106b50524..468566b82 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -241,6 +241,8 @@ public class TransformSqlOperatorTable extends
ReflectiveSqlOperatorTable {
InferTypes.RETURN_TYPE,
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.TIMESTAMP,
SqlTypeFamily.STRING),
+ OperandTypes.family(SqlTypeFamily.DATE,
SqlTypeFamily.STRING),
+ OperandTypes.family(SqlTypeFamily.TIME,
SqlTypeFamily.STRING),
OperandTypes.family(SqlTypeFamily.STRING,
SqlTypeFamily.STRING)),
SqlFunctionCategory.TIMEDATE);
public static final SqlFunction TIMESTAMP_DIFF =
@@ -298,6 +300,50 @@ public class TransformSqlOperatorTable extends
ReflectiveSqlOperatorTable {
OperandTypes.family(SqlTypeFamily.CHARACTER),
OperandTypes.family(SqlTypeFamily.CHARACTER,
SqlTypeFamily.CHARACTER)),
SqlFunctionCategory.TIMEDATE);
+ public static final SqlFunction TO_TIMESTAMP_LTZ =
+ new SqlFunction(
+ "TO_TIMESTAMP_LTZ",
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.cascade(
+
ReturnTypes.explicit(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3),
+ SqlTypeTransforms.FORCE_NULLABLE),
+ null,
+ OperandTypes.or(
+ OperandTypes.family(SqlTypeFamily.INTEGER),
+ OperandTypes.family(SqlTypeFamily.INTEGER,
SqlTypeFamily.INTEGER),
+ OperandTypes.family(SqlTypeFamily.CHARACTER),
+ OperandTypes.family(SqlTypeFamily.CHARACTER,
SqlTypeFamily.CHARACTER),
+ OperandTypes.family(
+ SqlTypeFamily.CHARACTER,
+ SqlTypeFamily.CHARACTER,
+ SqlTypeFamily.CHARACTER)),
+ SqlFunctionCategory.TIMEDATE);
+
+ public static final SqlFunction DATE_FORMAT_TZ =
+ new SqlFunction(
+ "DATE_FORMAT_TZ",
+ SqlKind.OTHER_FUNCTION,
+ TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
+ null,
+ OperandTypes.or(
+ OperandTypes.family(SqlTypeFamily.TIMESTAMP,
SqlTypeFamily.CHARACTER),
+ OperandTypes.family(
+ SqlTypeFamily.TIMESTAMP,
+ SqlTypeFamily.CHARACTER,
+ SqlTypeFamily.CHARACTER)),
+ SqlFunctionCategory.TIMEDATE);
+
+ public static final SqlFunction DATE_ADD =
+ new SqlFunction(
+ "DATE_ADD",
+ SqlKind.OTHER_FUNCTION,
+ TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
+ null,
+ OperandTypes.or(
+ OperandTypes.family(SqlTypeFamily.TIMESTAMP,
SqlTypeFamily.INTEGER),
+ OperandTypes.family(SqlTypeFamily.DATE,
SqlTypeFamily.INTEGER),
+ OperandTypes.family(SqlTypeFamily.CHARACTER,
SqlTypeFamily.INTEGER)),
+ SqlFunctionCategory.TIMEDATE);
// ---------------------
// Conditional Functions
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 94acbffaf..3f1d02686 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -398,6 +398,8 @@ class TransformParserTest {
TransformParser.translateFilterExpressionToJaninoExpression(
"TIMESTAMPDIFF(SECONDS, dt1, dt2)",
Collections.emptyList(),
+ Collections.emptyList(),
+ new SupportedMetadataColumn[0],
Collections.emptyMap());
})
.isExactlyInstanceOf(ParseException.class)
@@ -407,6 +409,8 @@ class TransformParserTest {
TransformParser.translateFilterExpressionToJaninoExpression(
"TIMESTAMPDIFF(QUARTER, dt1, dt2)",
Collections.emptyList(),
+ Collections.emptyList(),
+ new SupportedMetadataColumn[0],
Collections.emptyMap());
})
.isExactlyInstanceOf(ParseException.class)
@@ -417,6 +421,8 @@ class TransformParserTest {
TransformParser.translateFilterExpressionToJaninoExpression(
"TIMESTAMPADD(SECONDS, dt1, dt2)",
Collections.emptyList(),
+ Collections.emptyList(),
+ new SupportedMetadataColumn[0],
Collections.emptyMap());
})
.isExactlyInstanceOf(ParseException.class)
@@ -426,6 +432,8 @@ class TransformParserTest {
TransformParser.translateFilterExpressionToJaninoExpression(
"TIMESTAMPADD(QUARTER, dt1, dt2)",
Collections.emptyList(),
+ Collections.emptyList(),
+ new SupportedMetadataColumn[0],
Collections.emptyMap());
})
.isExactlyInstanceOf(ParseException.class)
@@ -622,6 +630,12 @@ class TransformParserTest {
@Test
public void testTranslateUdfFilterToJaninoExpressionWithColumnNameMap() {
+ List<Column> columns =
+ List.of(
+ Column.physicalColumn("a", DataTypes.INT()),
+ Column.physicalColumn("b", DataTypes.INT()),
+ Column.physicalColumn("a-b", DataTypes.INT()));
+
Map<String, String> columnNameMap = new HashMap<>();
columnNameMap.put("a", "$0");
columnNameMap.put("b", "$1");
@@ -630,42 +644,52 @@ class TransformParserTest {
testFilterExpressionWithUdf(
"format(upper(a))",
"__instanceOfFormatFunctionClass.eval(upper($0))",
+ columns,
columnNameMap);
testFilterExpressionWithUdf(
"format(lower(b))",
"__instanceOfFormatFunctionClass.eval(lower($1))",
+ columns,
columnNameMap);
testFilterExpressionWithUdf(
"format(concat(a,b))",
"__instanceOfFormatFunctionClass.eval(concat($0, $1))",
+ columns,
columnNameMap);
testFilterExpressionWithUdf(
"format(SUBSTR(`a-b`,1))",
"__instanceOfFormatFunctionClass.eval(substr($2, 1))",
+ columns,
columnNameMap);
testFilterExpressionWithUdf(
"typeof(`a-b` like '^[a-zA-Z]')",
"__instanceOfTypeOfFunctionClass.eval(like($2,
\"^[a-zA-Z]\"))",
+ columns,
columnNameMap);
testFilterExpressionWithUdf(
"typeof(`a-b` not like '^[a-zA-Z]')",
"__instanceOfTypeOfFunctionClass.eval(notLike($2,
\"^[a-zA-Z]\"))",
+ columns,
columnNameMap);
testFilterExpressionWithUdf(
"typeof(a-b-`a-b`)",
"__instanceOfTypeOfFunctionClass.eval($0 - $1 - $2)",
+ columns,
columnNameMap);
testFilterExpressionWithUdf(
"typeof(a-b-2)",
"__instanceOfTypeOfFunctionClass.eval($0 - $1 - 2)",
+ columns,
columnNameMap);
testFilterExpressionWithUdf(
"addone(addone(`a-b`)) > 4 OR typeof(a-b) <> 'bool' AND
format('from %s to %s is %s', 'a', 'z', 'lie') <> ''",
"greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval($2)),
4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval($0 - $1), \"bool\") &&
!valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\",
\"a\", \"z\", \"lie\"), \"\")",
+ columns,
columnNameMap);
testFilterExpressionWithUdf(
"ADDONE(ADDONE(`a-b`)) > 4 OR TYPEOF(a-b) <> 'bool' AND
FORMAT('from %s to %s is %s', 'a', 'z', 'lie') <> ''",
"greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval($2)),
4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval($0 - $1), \"bool\") &&
!valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\",
\"a\", \"z\", \"lie\"), \"\")",
+ columns,
columnNameMap);
}
@@ -687,6 +711,8 @@ class TransformParserTest {
TransformParser.translateFilterExpressionToJaninoExpression(
"id > 9223372036854775808",
Collections.emptyList(),
+ Collections.emptyList(),
+ new SupportedMetadataColumn[0],
Collections.emptyMap()))
.isExactlyInstanceOf(CalciteContextException.class)
.hasMessageContaining("Numeric literal '9223372036854775808'
out of range");
@@ -696,6 +722,8 @@ class TransformParserTest {
TransformParser.translateFilterExpressionToJaninoExpression(
"id < -9223372036854775809",
Collections.emptyList(),
+ Collections.emptyList(),
+ new SupportedMetadataColumn[0],
Collections.emptyMap()))
.isExactlyInstanceOf(CalciteContextException.class)
.hasMessageContaining("Numeric literal '-9223372036854775809'
out of range");
@@ -776,22 +804,34 @@ class TransformParserTest {
}
}
+ private static final List<Column> DUMMY_COLUMNS =
+ List.of(Column.physicalColumn("id", DataTypes.INT()));
+
private void testFilterExpression(String expression, String
expressionExpect) {
String janinoExpression =
TransformParser.translateFilterExpressionToJaninoExpression(
- expression, Collections.emptyList(),
Collections.emptyMap());
+ expression,
+ DUMMY_COLUMNS,
+ Collections.emptyList(),
+ new SupportedMetadataColumn[0],
+ Collections.emptyMap());
Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect);
}
private void testFilterExpressionWithUdf(String expression, String
expressionExpect) {
- testFilterExpressionWithUdf(expression, expressionExpect,
Collections.emptyMap());
+ testFilterExpressionWithUdf(
+ expression, expressionExpect, DUMMY_COLUMNS,
Collections.emptyMap());
}
private void testFilterExpressionWithUdf(
- String expression, String expressionExpect, Map<String, String>
columnNameMap) {
+ String expression,
+ String expressionExpect,
+ List<Column> columns,
+ Map<String, String> columnNameMap) {
String janinoExpression =
TransformParser.translateFilterExpressionToJaninoExpression(
expression,
+ columns,
Arrays.asList(
new UserDefinedFunctionDescriptor(
"format",
@@ -802,6 +842,7 @@ class TransformParserTest {
new UserDefinedFunctionDescriptor(
"typeof",
"org.apache.flink.cdc.udf.examples.java.TypeOfFunctionClass")),
+ new SupportedMetadataColumn[0],
columnNameMap);
Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect);
}