This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 4271144fa [FLINK-38906] Pass transform parser in context and polish 
built-in functions (#4248)
4271144fa is described below

commit 4271144faa0c7874122a07a8c97faca2d923f6be
Author: yuxiqian <[email protected]>
AuthorDate: Mon Feb 2 11:31:34 2026 +0800

    [FLINK-38906] Pass transform parser in context and polish built-in 
functions (#4248)
    
    Co-authored-by: Jia Fan <[email protected]>
---
 docs/content.zh/docs/core-concept/transform.md     |  29 +--
 docs/content/docs/core-concept/transform.md        | 145 +++++++-------
 .../flink/FlinkPipelineTransformITCase.java        |  11 +-
 .../src/test/resources/specs/comparison.yaml       |   4 -
 .../src/test/resources/specs/decimal.yaml          |   8 -
 .../src/test/resources/specs/meta.yaml             |  12 --
 .../src/test/resources/specs/temporal.yaml         |  12 --
 .../functions/impl/ComparisonFunctions.java        |  33 +++-
 .../operators/transform/PostTransformOperator.java |   2 +-
 .../operators/transform/PreTransformOperator.java  |   2 +-
 .../transform/ProjectionColumnProcessor.java       |   3 +-
 .../transform/TransformExpressionCompiler.java     |  16 +-
 .../transform/TransformExpressionKey.java          |  28 +--
 .../operators/transform/TransformFilter.java       |  24 +--
 .../transform/TransformFilterProcessor.java        |  32 ++-
 .../flink/cdc/runtime/parser/JaninoCompiler.java   | 218 ++++++++++++---------
 .../flink/cdc/runtime/parser/TransformParser.java  |  46 ++++-
 .../parser/metadata/TransformSqlOperatorTable.java |  46 +++++
 .../cdc/runtime/parser/TransformParserTest.java    |  47 ++++-
 19 files changed, 447 insertions(+), 271 deletions(-)

diff --git a/docs/content.zh/docs/core-concept/transform.md 
b/docs/content.zh/docs/core-concept/transform.md
index 5b56057ce..aec40d70a 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -142,18 +142,18 @@ Flink CDC 使用 [Calcite](https://calcite.apache.org/) 
来解析表达式并且
 
 ## 数学函数
 
-| Function            | Janino Code         | Description                      
                                                                                
                                                    |
-|---------------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| numeric1 + numeric2 | numeric1 + numeric2 | Returns NUMERIC1 plus NUMERIC2.  
                                                                                
                                                    |
-| numeric1 - numeric2 | numeric1 - numeric2 | Returns NUMERIC1 minus NUMERIC2. 
                                                                                
                                                    |
-| numeric1 * numeric2 | numeric1 * numeric2 | Returns NUMERIC1 multiplied by 
NUMERIC2.                                                                       
                                                      |
-| numeric1 / numeric2 | numeric1 / numeric2 | Returns NUMERIC1 divided by 
NUMERIC2.                                                                       
                                                         |
-| numeric1 % numeric2 | numeric1 % numeric2 | Returns the remainder (modulus) 
of numeric1 divided by numeric2.                                                
                                                     |
-| ABS(numeric)        | abs(numeric)        | Returns the absolute value of 
numeric.                                                                        
                                                       |
-| CEIL(numeric)<br/>CEILING(numeric)       | ceil(numeric)       | Rounds 
numeric up, and returns the smallest number that is greater than or equal to 
numeric.                                                                        
 |
-| FLOOR(numeric)      | floor(numeric)      | Rounds numeric down, and returns 
the largest number that is less than or equal to numeric.                       
                                                    |
-| ROUND(numeric, int) | round(numeric)      | Returns a number rounded to INT 
decimal places for NUMERIC.                                                     
                                                     |
-| UUID()              | uuid()              | Returns an UUID (Universally 
Unique Identifier) string (e.g., "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") 
according to RFC 4122 type 4 (pseudo randomly generated) UUID. |
+| Function                           | Janino Code         | Description       
                                                                                
                                                                   |
+|------------------------------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| numeric1 + numeric2                | numeric1 + numeric2 | Returns NUMERIC1 
plus NUMERIC2.                                                                  
                                                                    |
+| numeric1 - numeric2                | numeric1 - numeric2 | Returns NUMERIC1 
minus NUMERIC2.                                                                 
                                                                    |
+| numeric1 * numeric2                | numeric1 * numeric2 | Returns NUMERIC1 
multiplied by NUMERIC2.                                                         
                                                                    |
+| numeric1 / numeric2                | numeric1 / numeric2 | Returns NUMERIC1 
divided by NUMERIC2.                                                            
                                                                    |
+| numeric1 % numeric2                | numeric1 % numeric2 | Returns the 
remainder (modulus) of numeric1 divided by numeric2.                            
                                                                         |
+| ABS(numeric)                       | abs(numeric)        | Returns the 
absolute value of numeric.                                                      
                                                                         |
+| CEIL(numeric)<br/>CEILING(numeric) | ceil(numeric)       | Rounds numeric 
up, and returns the smallest number that is greater than or equal to numeric.   
                                                                      |
+| FLOOR(numeric)                     | floor(numeric)      | Rounds numeric 
down, and returns the largest number that is less than or equal to numeric.     
                                                                      |
+| ROUND(numeric, int)                | round(numeric)      | Returns a number 
rounded to INT decimal places for NUMERIC.                                      
                                                                    |
+| UUID()                             | uuid()              | Returns an UUID 
(Universally Unique Identifier) string (e.g., 
"3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo 
randomly generated) UUID. |
 
 ## 字符串函数
 
@@ -180,13 +180,18 @@ Flink CDC 使用 [Calcite](https://calcite.apache.org/) 
来解析表达式并且
 | CURRENT_TIMESTAMP                                    | currentTimestamp()    
                               | Returns the current SQL timestamp in the local 
time zone, the return type is TIMESTAMP_LTZ(3).                                 
                                                                                
                                                                                
                                                                                
              [...]
 | NOW()                                                | now()                 
                               | Returns the current SQL timestamp in the local 
time zone, this is a synonym of CURRENT_TIMESTAMP.                              
                                                                                
                                                                                
                                                                                
              [...]
 | DATE_FORMAT(timestamp, string)                       | dateFormat(timestamp, 
string)                        | Converts timestamp to a value of string in the 
format specified by the date format string. The format string is compatible 
with Java's SimpleDateFormat.                                                   
                                                                                
                                                                                
                  [...]
+| DATE_FORMAT(date, string)                            | dateFormat(date, 
string)                             | Converts given date to a value of string 
in the format specified by the format string. The format string is compatible 
with Java's SimpleDateFormat.                                                   
                                                                                
                                                                                
                      [...]
+| DATE_FORMAT(time, string)                            | dateFormat(time, 
string)                             | Converts given time to a value of string 
in the format specified by the format string. The format string is compatible 
with Java's SimpleDateFormat.                                                   
                                                                                
                                                                                
                      [...]
+| DATE_FORMAT_TZ(timestamp, format, timezone)          | 
dateFormatTz(timestamp, format, timezone)            | Formats a timestamp or 
datetime value as a string using the given pattern and the specified time zone. 
The timezone argument can be a time zone ID (for example, 'UTC', 
'Asia/Shanghai') or an offset such as '+08:00'.                                 
                                                                                
                                                     [...]
 | TIMESTAMPADD(timeintervalunit, interval, timepoint)  | 
timestampadd(timeintervalunit, interval, timepoint)  | Returns the timestamp of 
timepoint2 after timepoint added interval. The unit for the interval is given 
by the first argument, which should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, MONTH, or YEAR.                                              
                                                                                
                                          [...]
 | TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | 
timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) 
number of timepointunit between timepoint1 and timepoint2. The unit for the 
interval is given by the first argument, which should be one of the following 
values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.                              
                                                                                
                                              [...]
 | TO_DATE(string1[, string2])                          | toDate(string1[, 
string2])                           | Converts a date string string1 with 
format string2 (by default 'yyyy-MM-dd') to a date.                             
                                                                                
                                                                                
                                                                                
                         [...]
 | TO_TIMESTAMP(string1[, string2])                     | toTimestamp(string1[, 
string2])                      | Converts date time string string1 with format 
string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone.  
                                                                                
                                                                                
                                                                                
               [...]
+| TO_TIMESTAMP_LTZ(string1[, string2])                 | 
toTimestampLtz(string1[, string2])                   | Converts date time 
string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a 
timestamp, with local time zone.                                                
                                                                                
                                                                                
                                              [...]
 | FROM_UNIXTIME(numeric[, string])                     | 
fromUnixtime(NUMERIC[, STRING])                      | Returns a representation 
of the numeric argument as a value in string format (default is ‘yyyy-MM-dd 
HH:mm:ss’). numeric is an internal timestamp value representing seconds since 
‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. 
The return value is expressed in the session time zone (specified in 
TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01 [...]
 | UNIX_TIMESTAMP()                                     | unixTimestamp()       
                               | Gets current Unix timestamp in seconds. This 
function is not deterministic which means the value would be recalculated for 
each record.                                                                    
                                                                                
                                                                                
                  [...]
 | UNIX_TIMESTAMP(string1[, string2])                   | 
unixTimestamp(STRING1[, STRING2])                    | Converts a date time 
string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not 
specified) to Unix timestamp (in seconds), using the specified timezone in 
table config.<br/>If a time zone is specified in the date time string and 
parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will 
use the specified timezone in the date time string inste [...]
+| DATE_ADD(date, int)                                  | dateAdd(date, int)    
                               | Adds N days to the given date and returns a 
string in 'yyyy-MM-dd' format.                                                  
                                                                                
                                                                                
                                                                                
                 [...]
 
 ## 条件函数
 
diff --git a/docs/content/docs/core-concept/transform.md 
b/docs/content/docs/core-concept/transform.md
index 024401782..584c3606d 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -112,91 +112,96 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to 
parse expressions and [
 
 ## Comparison Functions
 
-| Function             | Janino Code                                  | 
Description                                                     |
-|----------------------|----------------------------------------------|-----------------------------------------------------------------|
-| value1 = value2      | valueEquals(value1, value2)                  | 
Returns TRUE if value1 is equal to value2; returns FALSE if value1 or value2 is 
NULL. |
-| value1 <> value2     | !valueEquals(value1, value2)                 | 
Returns TRUE if value1 is not equal to value2; returns FALSE if value1 or 
value2 is NULL. |
-| value1 > value2      | greaterThan(value1, value2)                  | 
Returns TRUE if value1 is greater than value2; returns FALSE if value1 or 
value2 is NULL. |
-| value1 >= value2     | greaterThanOrEqual(value1, value2)               | 
Returns TRUE if value1 is greater than or equal to value2; returns FALSE if 
value1 or value2 is NULL. |
-| value1 < value2      | lessThan(value1, value2)                         | 
Returns TRUE if value1 is less than value2; returns FALSE if value1 or value2 
is NULL. |
-| value1 <= value2     | lessThanOrEqual(value1, value2)                  | 
Returns TRUE if value1 is less than or equal to value2; returns FALSE if value1 
or value2 is NULL. |
-| value IS NULL        | null == value                                | 
Returns TRUE if value is NULL.                                  |
-| value IS NOT NULL    | null != value                                | 
Returns TRUE if value is not NULL.                              |
-| value1 BETWEEN value2 AND value3 | betweenAsymmetric(value1, value2, value3) 
   | Returns TRUE if value1 is greater than or equal to value2 and less than or 
equal to value3. |
-| value1 NOT BETWEEN value2 AND value3 | notBetweenAsymmetric(value1, value2, 
value3) | Returns TRUE if value1 is less than value2 or greater than value3. |
-| string1 LIKE string2 | like(string1, string2)                       | 
Returns TRUE if string1 matches pattern string2.                |
-| string1 NOT LIKE string2 | notLike(string1, string2)                    | 
Returns TRUE if string1 does not match pattern string2.       |
-| value1 IN (value2 [, value3]* ) | in(value1, value2 [, value3]*)             
  | Returns TRUE if value1 exists in the given list (value2, value3, …). |
-| value1 NOT IN (value2 [, value3]* ) | notIn(value1, value2 [, value3]*)      
      | Returns TRUE if value1 does not exist in the given list (value2, 
value3, …).  |
+| Function                             | Janino Code                           
       | Description                                                            
                               |
+|--------------------------------------|----------------------------------------------|-------------------------------------------------------------------------------------------------------|
+| value1 = value2                      | valueEquals(value1, value2)           
       | Returns TRUE if value1 is equal to value2; returns FALSE if value1 or 
value2 is NULL.                 |
+| value1 <> value2                     | !valueEquals(value1, value2)          
       | Returns TRUE if value1 is not equal to value2; returns FALSE if value1 
or value2 is NULL.             |
+| value1 > value2                      | greaterThan(value1, value2)           
       | Returns TRUE if value1 is greater than value2; returns FALSE if value1 
or value2 is NULL.             |
+| value1 >= value2                     | greaterThanOrEqual(value1, value2)    
       | Returns TRUE if value1 is greater than or equal to value2; returns 
FALSE if value1 or value2 is NULL. |
+| value1 < value2                      | lessThan(value1, value2)              
       | Returns TRUE if value1 is less than value2; returns FALSE if value1 or 
value2 is NULL.                |
+| value1 <= value2                     | lessThanOrEqual(value1, value2)       
       | Returns TRUE if value1 is less than or equal to value2; returns FALSE 
if value1 or value2 is NULL.    |
+| value IS NULL                        | null == value                         
       | Returns TRUE if value is NULL.                                         
                               |
+| value IS NOT NULL                    | null != value                         
       | Returns TRUE if value is not NULL.                                     
                               |
+| value1 BETWEEN value2 AND value3     | betweenAsymmetric(value1, value2, 
value3)    | Returns TRUE if value1 is greater than or equal to value2 and less 
than or equal to value3.           |
+| value1 NOT BETWEEN value2 AND value3 | notBetweenAsymmetric(value1, value2, 
value3) | Returns TRUE if value1 is less than value2 or greater than value3.    
                                |
+| string1 LIKE string2                 | like(string1, string2)                
       | Returns TRUE if string1 matches pattern string2.                       
                               |
+| string1 NOT LIKE string2             | notLike(string1, string2)             
       | Returns TRUE if string1 does not match pattern string2.                
                               |
+| value1 IN (value2 [, value3]* )      | in(value1, value2 [, value3]*)        
       | Returns TRUE if value1 exists in the given list (value2, value3, …).   
                               |
+| value1 NOT IN (value2 [, value3]* )  | notIn(value1, value2 [, value3]*)     
       | Returns TRUE if value1 does not exist in the given list (value2, 
value3, …).                          |
 
 ## Logical Functions
 
-| Function             | Janino Code                 | Description             
                                        |
-|----------------------|-----------------------------|-----------------------------------------------------------------|
-| boolean1 OR boolean2 | boolean1 &#124;&#124; boolean2 | Returns TRUE if 
BOOLEAN1 is TRUE or BOOLEAN2 is TRUE.        |
-| boolean1 AND boolean2 | boolean1 && boolean2       | Returns TRUE if 
BOOLEAN1 and BOOLEAN2 are both TRUE.            |
-| NOT boolean          | !boolean                    | Returns TRUE if boolean 
is FALSE; returns FALSE if boolean is TRUE. |
-| boolean IS FALSE     | false == boolean            | Returns TRUE if boolean 
is FALSE; returns FALSE if boolean is TRUE. |
-| boolean IS NOT FALSE | true == boolean             | Returns TRUE if BOOLEAN 
is TRUE; returns FALSE if BOOLEAN is FALSE. |
-| boolean IS TRUE      | true == boolean             | Returns TRUE if BOOLEAN 
is TRUE; returns FALSE if BOOLEAN is FALSE. |
-| boolean IS NOT TRUE  | false == boolean            | Returns TRUE if boolean 
is FALSE; returns FALSE if boolean is TRUE. |
+| Function              | Janino Code                    | Description         
                                                |
+|-----------------------|--------------------------------|---------------------------------------------------------------------|
+| boolean1 OR boolean2  | boolean1 &#124;&#124; boolean2 | Returns TRUE if 
BOOLEAN1 is TRUE or BOOLEAN2 is TRUE.               |
+| boolean1 AND boolean2 | boolean1 && boolean2           | Returns TRUE if 
BOOLEAN1 and BOOLEAN2 are both TRUE.                |
+| NOT boolean           | !boolean                       | Returns TRUE if 
boolean is FALSE; returns FALSE if boolean is TRUE. |
+| boolean IS FALSE      | false == boolean               | Returns TRUE if 
boolean is FALSE; returns FALSE if boolean is TRUE. |
+| boolean IS NOT FALSE  | true == boolean                | Returns TRUE if 
BOOLEAN is TRUE; returns FALSE if BOOLEAN is FALSE. |
+| boolean IS TRUE       | true == boolean                | Returns TRUE if 
BOOLEAN is TRUE; returns FALSE if BOOLEAN is FALSE. |
+| boolean IS NOT TRUE   | false == boolean               | Returns TRUE if 
boolean is FALSE; returns FALSE if boolean is TRUE. |
 
 ## Arithmetic Functions
 
-| Function                           | Janino Code                 | 
Description                                                     |
-|------------------------------------|-----------------------------|-----------------------------------------------------------------|
-| numeric1 + numeric2                | numeric1 + numeric2         | Returns 
NUMERIC1 plus NUMERIC2.                                 |
-| numeric1 - numeric2                | numeric1 - numeric2         | Returns 
NUMERIC1 minus NUMERIC2.                                |
-| numeric1 * numeric2                | numeric1 * numeric2         | Returns 
NUMERIC1 multiplied by NUMERIC2.                        |
-| numeric1 / numeric2                | numeric1 / numeric2         | Returns 
NUMERIC1 divided by NUMERIC2.                          |
-| numeric1 % numeric2                | numeric1 % numeric2         | Returns 
the remainder (modulus) of numeric1 divided by numeric2. |
-| ABS(numeric)                       | abs(numeric)                | Returns 
the absolute value of numeric.                          |
-| CEIL(numeric)<br/>CEILING(numeric) | ceil(numeric)               | Rounds 
numeric up, and returns the smallest number that is greater than or equal to 
numeric. |
-| FLOOR(numeric)                     | floor(numeric)              | Rounds 
numeric down, and returns the largest number that is less than or equal to 
numeric. |
-| ROUND(numeric, int)                | round(numeric)              | Returns a 
number rounded to INT decimal places for NUMERIC.     |
-| UUID()                             | uuid()                      | Returns 
an UUID (Universally Unique Identifier) string (e.g., 
"3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo 
randomly generated) UUID. |
+| Function                           | Janino Code         | Description       
                                                                                
                                                                   |
+|------------------------------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| numeric1 + numeric2                | numeric1 + numeric2 | Returns NUMERIC1 
plus NUMERIC2.                                                                  
                                                                    |
+| numeric1 - numeric2                | numeric1 - numeric2 | Returns NUMERIC1 
minus NUMERIC2.                                                                 
                                                                    |
+| numeric1 * numeric2                | numeric1 * numeric2 | Returns NUMERIC1 
multiplied by NUMERIC2.                                                         
                                                                    |
+| numeric1 / numeric2                | numeric1 / numeric2 | Returns NUMERIC1 
divided by NUMERIC2.                                                            
                                                                    |
+| numeric1 % numeric2                | numeric1 % numeric2 | Returns the 
remainder (modulus) of numeric1 divided by numeric2.                            
                                                                         |
+| ABS(numeric)                       | abs(numeric)        | Returns the 
absolute value of numeric.                                                      
                                                                         |
+| CEIL(numeric)<br/>CEILING(numeric) | ceil(numeric)       | Rounds numeric 
up, and returns the smallest number that is greater than or equal to numeric.   
                                                                      |
+| FLOOR(numeric)                     | floor(numeric)      | Rounds numeric 
down, and returns the largest number that is less than or equal to numeric.     
                                                                      |
+| ROUND(numeric, int)                | round(numeric)      | Returns a number 
rounded to INT decimal places for NUMERIC.                                      
                                                                    |
+| UUID()                             | uuid()              | Returns an UUID 
(Universally Unique Identifier) string (e.g., 
"3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo 
randomly generated) UUID. |
 
 ## String Functions
 
-| Function             | Janino Code              | Description                
                                                                                
                                                                                
       |
-| -------------------- | ------------------------ 
|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| string1 &#124;&#124; string2 | concat(string1, string2) | Returns the 
concatenation of STRING1 and STRING2.                                           
                                                                                
                      |
-| CHAR_LENGTH(string)  | charLength(string)       | Returns the number of 
characters in STRING.                                                           
                                                                                
            |
-| UPPER(string)        | upper(string)            | Returns string in 
uppercase.                                                                      
                                                                                
                |
-| LOWER(string) | lower(string) | Returns string in lowercase.                 
                                                                                
                                                                     |
-| TRIM(string1) | trim('BOTH',string1) | Returns a string that removes 
whitespaces at both sides.                                                      
                                                                                
    |
-| REGEXP_REPLACE(string1, string2, string3) | regexpReplace(string1, string2, 
string3) | Returns a string from STRING1 with all the substrings that match a 
regular expression STRING2 consecutively being replaced with STRING3. E.g., 
'foobar'.regexpReplace('oo\|ar', '') returns "fb". |
-| SUBSTR(string, integer1[, integer2]) | substr(string,integer1,integer2) | 
Returns a substring of STRING starting from position integer1 with length 
integer2 (to the end by default).                                               
                                        |
-| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | 
substring(string,integer1,integer2) | Returns a substring of STRING starting 
from position integer1 with length integer2 (to the end by default).            
                                                                           |
-| CONCAT(string1, string2,…) | concat(string1, string2,…) | Returns a string 
that concatenates string1, string2, …. E.g., CONCAT('AA', 'BB', 'CC') returns 
'AABBCC'.                                                                       
                   |
+| Function                                         | Janino Code               
               | Description                                                    
                                                                                
                                                   |
+|--------------------------------------------------|------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| string1 &#124;&#124; string2                     | concat(string1, string2)  
               | Returns the concatenation of STRING1 and STRING2.              
                                                                                
                                                   |
+| CHAR_LENGTH(string)                              | charLength(string)        
               | Returns the number of characters in STRING.                    
                                                                                
                                                   |
+| UPPER(string)                                    | upper(string)             
               | Returns string in uppercase.                                   
                                                                                
                                                   |
+| LOWER(string)                                    | lower(string)             
               | Returns string in lowercase.                                   
                                                                                
                                                   |
+| TRIM(string1)                                    | trim('BOTH',string1)      
               | Returns a string that removes whitespaces at both sides.       
                                                                                
                                                   |
+| REGEXP_REPLACE(string1, string2, string3)        | regexpReplace(string1, 
string2, string3) | Returns a string from STRING1 with all the substrings that 
match a regular expression STRING2 consecutively being replaced with STRING3. 
E.g., 'foobar'.regexpReplace('oo\|ar', '') returns "fb". |
+| SUBSTR(string, integer1[, integer2])             | 
substr(string,integer1,integer2)         | Returns a substring of STRING 
starting from position integer1 with length integer2 (to the end by default).   
                                                                                
    |
+| SUBSTRING(string FROM integer1 [ FOR integer2 ]) | 
substring(string,integer1,integer2)      | Returns a substring of STRING 
starting from position integer1 with length integer2 (to the end by default).   
                                                                                
    |
+| CONCAT(string1, string2,…)                       | concat(string1, 
string2,…)               | Returns a string that concatenates string1, string2, 
…. E.g., CONCAT('AA', 'BB', 'CC') returns 'AABBCC'.                             
                                                             |
 
 ## Temporal Functions
 
-| Function             | Janino Code              | Description                
                       |
-| -------------------- | ------------------------ | 
------------------------------------------------- |
-| LOCALTIME | localtime() | Returns the current SQL time in the local time 
zone, the return type is TIME(0). |
-| LOCALTIMESTAMP | localtimestamp() | Returns the current SQL timestamp in 
local time zone, the return type is TIMESTAMP(3). |
-| CURRENT_TIME | currentTime() | Returns the current SQL time in the local 
time zone, this is a synonym of LOCAL_TIME. |
-| CURRENT_DATE                                         | currentDate() | 
Returns the current SQL date in the local time zone. |
-| CURRENT_TIMESTAMP | currentTimestamp() | Returns the current SQL timestamp 
in the local time zone, the return type is TIMESTAMP_LTZ(3). |
-| NOW() | now() | Returns the current SQL timestamp in the local time zone, 
this is a synonym of CURRENT_TIMESTAMP. |
-| DATE_FORMAT(timestamp, string) | dateFormat(timestamp, string) | Converts 
timestamp to a value of string in the format specified by the date format 
string. The format string is compatible with Java's SimpleDateFormat. |
-| TIMESTAMPADD(timeintervalunit, interval, timepoint)   | 
timestampadd(timeintervalunit, interval, timepoint)  | Returns the timestamp of 
timepoint2 after timepoint added interval. The unit for the interval is given 
by the first argument, which should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, MONTH, or YEAR.     |
-| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | 
timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) 
number of timepointunit between timepoint1 and timepoint2. The unit for the 
interval is given by the first argument, which should be one of the following 
values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
-| TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date 
string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
-| TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | 
Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd 
HH:mm:ss') to a timestamp, without time zone. |
-| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns 
a representation of the numeric argument as a value in string format (default 
is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing 
seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the 
UNIX_TIMESTAMP() function. The return value is expressed in the session time 
zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 
00:00:44’ if in UTC time zone, but re [...]
-| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. 
This function is not deterministic which means the value would be recalculated 
for each record. |
-| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | 
Converts a date time string string1 with format string2 (by default: yyyy-MM-dd 
HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified 
timezone in table config.<br/>If a time zone is specified in the date time 
string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this 
function will use the specified timezone in the date time string instead of the 
timezone in table config. I [...]
+| Function                                             | Janino Code           
                               | Description                                    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|------------------------------------------------------|------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| LOCALTIME                                            | localtime()           
                               | Returns the current SQL time in the local time 
zone, the return type is TIME(0).                                               
                                                                                
                                                                                
                                                                                
              [...]
+| LOCALTIMESTAMP                                       | localtimestamp()      
                               | Returns the current SQL timestamp in local 
time zone, the return type is TIMESTAMP(3).                                     
                                                                                
                                                                                
                                                                                
                  [...]
+| CURRENT_TIME                                         | currentTime()         
                               | Returns the current SQL time in the local time 
zone, this is a synonym of LOCAL_TIME.                                          
                                                                                
                                                                                
                                                                                
              [...]
+| CURRENT_DATE                                         | currentDate()         
                               | Returns the current SQL date in the local time 
zone.                                                                           
                                                                                
                                                                                
                                                                                
              [...]
+| CURRENT_TIMESTAMP                                    | currentTimestamp()    
                               | Returns the current SQL timestamp in the local 
time zone, the return type is TIMESTAMP_LTZ(3).                                 
                                                                                
                                                                                
                                                                                
              [...]
+| NOW()                                                | now()                 
                               | Returns the current SQL timestamp in the local 
time zone, this is a synonym of CURRENT_TIMESTAMP.                              
                                                                                
                                                                                
                                                                                
              [...]
+| DATE_FORMAT(timestamp, string)                       | dateFormat(timestamp, 
string)                        | Converts timestamp to a value of string in the 
format specified by the format string. The format string is compatible with 
Java's SimpleDateFormat.                                                        
                                                                                
                                                                                
                  [...]
+| DATE_FORMAT(date, string)                            | dateFormat(date, 
string)                             | Converts given date to a value of string 
in the format specified by the format string. The format string is compatible 
with Java's SimpleDateFormat.                                                   
                                                                                
                                                                                
                      [...]
+| DATE_FORMAT(time, string)                            | dateFormat(time, 
string)                             | Converts given time to a value of string 
in the format specified by the format string. The format string is compatible 
with Java's SimpleDateFormat.                                                   
                                                                                
                                                                                
                      [...]
+| DATE_FORMAT_TZ(timestamp, format, timezone)          | 
dateFormatTz(timestamp, format, timezone)            | Formats a timestamp or 
datetime value as a string using the given pattern and the specified time zone. 
The timezone argument can be a time zone ID (for example, 'UTC', 
'Asia/Shanghai') or an offset such as '+08:00'.                                 
                                                                                
                                                     [...]
+| TIMESTAMPADD(timeintervalunit, interval, timepoint)  | 
timestampadd(timeintervalunit, interval, timepoint)  | Returns the timestamp of 
timepoint2 after timepoint added interval. The unit for the interval is given 
by the first argument, which should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, MONTH, or YEAR.                                              
                                                                                
                                          [...]
+| TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | 
timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) 
number of timepointunit between timepoint1 and timepoint2. The unit for the 
interval is given by the first argument, which should be one of the following 
values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.                              
                                                                                
                                              [...]
+| TO_DATE(string1[, string2])                          | toDate(string1[, 
string2])                           | Converts a date string string1 with 
format string2 (by default 'yyyy-MM-dd') to a date.                             
                                                                                
                                                                                
                                                                                
                         [...]
+| TO_TIMESTAMP(string1[, string2])                     | toTimestamp(string1[, 
string2])                      | Converts date time string string1 with format 
string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone.  
                                                                                
                                                                                
                                                                                
               [...]
+| TO_TIMESTAMP_LTZ(string1[, string2])                 | 
toTimestampLtz(string1[, string2])                   | Converts date time 
string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a 
timestamp, with local time zone.                                                
                                                                                
                                                                                
                                              [...]
+| FROM_UNIXTIME(numeric[, string])                     | 
fromUnixtime(NUMERIC[, STRING])                      | Returns a representation 
of the numeric argument as a value in string format (default is ‘yyyy-MM-dd 
HH:mm:ss’). numeric is an internal timestamp value representing seconds since 
‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. 
The return value is expressed in the session time zone (specified in 
TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01 [...]
+| UNIX_TIMESTAMP()                                     | unixTimestamp()       
                               | Gets current Unix timestamp in seconds. This 
function is not deterministic which means the value would be recalculated for 
each record.                                                                    
                                                                                
                                                                                
                  [...]
+| UNIX_TIMESTAMP(string1[, string2])                   | 
unixTimestamp(STRING1[, STRING2])                    | Converts a date time 
string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not 
specified) to Unix timestamp (in seconds), using the specified timezone in 
table config.<br/>If a time zone is specified in the date time string and 
parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will 
use the specified timezone in the date time string inste [...]
+| DATE_ADD(date, int)                                  | dateAdd(date, int)    
                               | Add N days to given date data.                 
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 
 ## Conditional Functions
 
-| Function             | Janino Code              | Description                
                       |
-| -------------------- | ------------------------ | 
------------------------------------------------- |
-| CASE value WHEN value1_1 [, value1_2]* THEN RESULT1 (WHEN value2_1 [, 
value2_2 ]* THEN result_2)* (ELSE result_z) END | Nested ternary expression | 
Returns resultX when the first time value is contained in (valueX_1, valueX_2, 
…). When no value matches, returns result_z if it is provided and returns NULL 
otherwise. |
-| CASE WHEN condition1 THEN result1 (WHEN condition2 THEN result2)* (ELSE 
result_z) END | Nested ternary expression | Returns resultX when the first 
conditionX is met. When no condition is met, returns result_z if it is provided 
and returns NULL otherwise. |
-| COALESCE(value1 [, value2]*) | coalesce(Object... objects) | Returns the 
first argument that is not NULL.If all arguments are NULL, it returns NULL as 
well. The return type is the least restrictive, common type of all of its 
arguments. The return type is nullable if all arguments are nullable as well. |
-| IF(condition, true_value, false_value)   | condition ? true_value : 
false_value | Returns the true_value if condition is met, otherwise 
false_value. E.g., IF(5 > 3, 5, 3) returns 5. |
+| Function                                                                     
                                         | Janino Code                          
| Description                                                                   
                                                                                
                                                                                
    |
+|-----------------------------------------------------------------------------------------------------------------------|--------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| CASE value WHEN value1_1 [, value1_2]* THEN RESULT1 (WHEN value2_1 [, 
value2_2 ]* THEN result_2)* (ELSE result_z) END | Nested ternary expression     
       | Returns resultX when the first time value is contained in (valueX_1, 
valueX_2, …). When no value matches, returns result_z if it is provided and 
returns NULL otherwise.                                                         
                 |
+| CASE WHEN condition1 THEN result1 (WHEN condition2 THEN result2)* (ELSE 
result_z) END                                 | Nested ternary expression       
     | Returns resultX when the first conditionX is met. When no condition is 
met, returns result_z if it is provided and returns NULL otherwise.             
                                                                                
           |
+| COALESCE(value1 [, value2]*)                                                 
                                         | coalesce(Object... objects)          
| Returns the first argument that is not NULL.If all arguments are NULL, it 
returns NULL as well. The return type is the least restrictive, common type of 
all of its arguments. The return type is nullable if all arguments are nullable 
as well. |
+| IF(condition, true_value, false_value)                                       
                                         | condition ? true_value : false_value 
| Returns the true_value if condition is met, otherwise false_value. E.g., IF(5 
> 3, 5, 3) returns 5.                                                           
                                                                                
    |
 
 ## Casting Functions
 
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index c81b1ba46..e4da2fca3 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -51,7 +51,6 @@ import 
org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions;
 import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper;
 import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions;
 import 
org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException;
-import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -2737,14 +2736,9 @@ class FlinkPipelineTransformITCase {
                 .cause()
                 .isExactlyInstanceOf(FlinkRuntimeException.class)
                 .hasMessage(
-                        "Failed to compile expression 
TransformExpressionKey{originalExpression='id1 > 0', expression='"
-                                + JaninoCompiler.LOAD_MODULES_EXPRESSION
-                                + "greaterThan($0, 0)', 
argumentNames=[__time_zone__, __epoch_time__], argumentClasses=[class 
java.lang.String, class java.lang.Long], returnClass=class java.lang.Boolean, 
columnNameMap={id1=$0}}")
+                        "Failed to compile expression 
TransformExpressionKey{originalExpression='id1 > 0', 
compiledExpression='greaterThan($0, 0)', argumentNames=[__time_zone__, 
__epoch_time__], argumentClasses=[class java.lang.String, class 
java.lang.Long], returnClass=class java.lang.Boolean, columnNameMap={id1=$0}}")
                 .cause()
-                .hasMessageContaining(
-                        "Compiled expression: "
-                                + JaninoCompiler.LOAD_MODULES_EXPRESSION
-                                + "greaterThan($0, 0)")
+                .hasMessageContaining("Compiled expression: greaterThan($0, 
0)")
                 .hasMessageContaining("Column name map: {$0 -> id1}")
                 .rootCause()
                 .isExactlyInstanceOf(CompileException.class)
@@ -2812,7 +2806,6 @@ class FlinkPipelineTransformITCase {
                                 + "\tOriginal expression: name + 1 > 0\n"
                                 + "\tCompiled expression: greaterThan($0 + 1, 
0)\n"
                                 + "\tColumn name map: {$0 -> name}")
-                .hasMessageContaining("Column name map: {$0 -> name}")
                 .rootCause()
                 .isExactlyInstanceOf(RuntimeException.class)
                 .hasMessage(
diff --git a/flink-cdc-composer/src/test/resources/specs/comparison.yaml 
b/flink-cdc-composer/src/test/resources/specs/comparison.yaml
index 99c6d6d2d..c022f0780 100644
--- a/flink-cdc-composer/src/test/resources/specs/comparison.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/comparison.yaml
@@ -166,7 +166,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, true, 
false, true, false], op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null, true, false, true, 
false], after=[], op=DELETE, meta=()}
 - do: Between Op
-  ignore: FLINK-38906
   projection: |-
     id_
     tinyint_ BETWEEN CAST(1 AS TINYINT) AND CAST(3 AS TINYINT) AS comp_1
@@ -189,7 +188,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, false, false, 
false, false, false, false, false, false, false, false, false], op=INSERT, 
meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, false, false, false, 
false, false, false, false, false, false, false, false], after=[], op=DELETE, 
meta=()}
 - do: Not Between Op
-  ignore: FLINK-38906
   projection: |-
     id_
     tinyint_ NOT BETWEEN CAST(1 AS TINYINT) AND CAST(3 AS TINYINT) AS comp_1
@@ -238,7 +236,6 @@
   projection: id_, char_ NOT LIKE 'A.*' AS comp_1, varchar_ NOT LIKE '.*rro' 
AS comp_2, string_ NOT LIKE 'From [A-Z] to [A-Z] is Lie' AS comp_3
   primary-key: id_
 - do: In Op
-  ignore: FLINK-38906
   projection: |-
     id_
     tinyint_ IN (CAST(2 AS TINYINT)) AS comp_1
@@ -261,7 +258,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, false, false, 
false, false, false, false, false, false, false, false, false], op=INSERT, 
meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, false, false, false, 
false, false, false, false, false, false, false, false], after=[], op=DELETE, 
meta=()}
 - do: Not In Op
-  ignore: FLINK-38906
   projection: |-
     id_
     tinyint_ NOT IN (CAST(2 AS TINYINT)) AS comp_1
diff --git a/flink-cdc-composer/src/test/resources/specs/decimal.yaml 
b/flink-cdc-composer/src/test/resources/specs/decimal.yaml
index e12ddb025..f1418dd32 100644
--- a/flink-cdc-composer/src/test/resources/specs/decimal.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/decimal.yaml
@@ -14,7 +14,6 @@
 
################################################################################
 
 - do: Add Op
-  ignore: FLINK-38906
   projection: |-
     id_
     decimal_10_0_ + CAST(1 AS DECIMAL(1, 0)) AS comp_1
@@ -27,7 +26,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567891, null], 
after=[-1, -9876543209, null], op=UPDATE, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543209, null], 
after=[], op=DELETE, meta=()}
 - do: Subtract Op
-  ignore: FLINK-38906
   projection: |-
     id_
     decimal_10_0_ - CAST(1 AS DECIMAL(1, 0)) AS comp_1
@@ -40,7 +38,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[1, 1234567889, null], 
after=[-1, -9876543211, null], op=UPDATE, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[-1, -9876543211, null], 
after=[], op=DELETE, meta=()}
 - do: Multiply Op
-  ignore: FLINK-38906
   projection: |-
     id_
     decimal_10_0_ * CAST(2 AS DECIMAL(1, 0)) AS comp_1
@@ -53,7 +50,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[1, 2469135780, null], 
after=[-1, -19753086420, null], op=UPDATE, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[-1, -19753086420, null], 
after=[], op=DELETE, meta=()}
 - do: Divide Op
-  ignore: FLINK-38906
   projection: |-
     id_
     decimal_10_0_ / CAST(2 AS DECIMAL(1, 0)) AS comp_1
@@ -66,7 +62,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[1, 617.283945, 
61728394506172839.45], after=[-1, -4938.271605, null], op=UPDATE, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[-1, -4938.271605, null], 
after=[], op=DELETE, meta=()}
 - do: Abs Op
-  ignore: FLINK-38906
   projection: |-
     id_
     ABS(decimal_10_0_) AS comp_1
@@ -80,7 +75,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], 
op=DELETE, meta=()}
 - do: Ceil Op
-  ignore: FLINK-38906
   projection: |-
     id_
     CEIL(decimal_10_0_) AS comp_1
@@ -94,7 +88,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], 
op=DELETE, meta=()}
 - do: Floor Op
-  ignore: FLINK-38906
   projection: |-
     id_
     FLOOR(decimal_10_0_) AS comp_1
@@ -108,7 +101,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], 
op=DELETE, meta=()}
 - do: Round Op
-  ignore: FLINK-38906
   projection: |-
     id_
     ROUND(decimal_10_0_, 1) AS comp_1
diff --git a/flink-cdc-composer/src/test/resources/specs/meta.yaml 
b/flink-cdc-composer/src/test/resources/specs/meta.yaml
index e8c893bf5..df6c2f857 100644
--- a/flink-cdc-composer/src/test/resources/specs/meta.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/meta.yaml
@@ -47,15 +47,3 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-D, -1, false, -2, 
-3, -4, -5, -7.7, -88.88, -9876543210, -987654321098765432.10, 爱丽丝, 疯帽子, 
天地玄黄宇宙洪荒, 5LiA5LqM5LiJ5Zub5LqU, 5YWt5LiD5YWr5Lmd5Y2B, 
5ZC+6Lyp44Gv54yr44Gn44GC44KL, 1970-01-09T08:57:36.789723456, 
1970-01-10T15:49:27.891834561, 1970-01-11T22:41:18.912945612, 
1970-01-09T08:57:36.789723456+08:00, 1970-01-10T15:49:27.891834561+01:00, 
1970-01-11T22:41:18.912945612-04:00, 1970-01-09T08:57:36.789723456, 
1970-01-10T15:49:27.89 [...]
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[+I, 0, null, null, 
null, null, null, null, null, null, null, null, null, null, null, null, null, 
null, null, null, null, null, null, null, null, null, null, null, null, null, 
null, null, null, null, null, null], op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-D, 0, null, null, 
null, null, null, null, null, null, null, null, null, null, null, null, null, 
null, null, null, null, null, null, null, null, null, null, null, null, null, 
null, null, null, null, null, null], op=INSERT, meta=()}
-- do: Downcase Post Converter
-  ignore: FLINK-38887
-  projection: id_, 'UPCASE' AS UPCASE, 'downcase' AS downcase, 'MiXeD cAsE' AS 
MiXeD_cAsE
-  primary-key: id_
-  converters: FIELD_NAME_LOWER_CASE
-  expect: |-
-    CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`upcase` STRING,`downcase` STRING,`mixed_case` STRING}, 
primaryKeys=id_, options=()}
-    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, UPCASE, 
downcase, MiXeD cAsE], op=INSERT, meta=()}
-    DataChangeEvent{tableId=foo.bar.baz, before=[1, UPCASE, downcase, MiXeD 
cAsE], after=[-1, UPCASE, downcase, MiXeD cAsE], op=UPDATE, meta=()}
-    DataChangeEvent{tableId=foo.bar.baz, before=[-1, UPCASE, downcase, MiXeD 
cAsE], after=[], op=DELETE, meta=()}
-    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, UPCASE, 
downcase, MiXeD cAsE], op=INSERT, meta=()}
-    DataChangeEvent{tableId=foo.bar.baz, before=[0, UPCASE, downcase, MiXeD 
cAsE], after=[], op=DELETE, meta=()}
diff --git a/flink-cdc-composer/src/test/resources/specs/temporal.yaml 
b/flink-cdc-composer/src/test/resources/specs/temporal.yaml
index 3e485e659..3e6125296 100644
--- a/flink-cdc-composer/src/test/resources/specs/temporal.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/temporal.yaml
@@ -78,7 +78,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null, 
null, null, null, null], op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null, null, 
null, null], after=[], op=DELETE, meta=()}
 - do: To Date Function
-  ignore: FLINK-38906
   projection: |-
     id_
     TO_DATE('2025-01-05') AS comp_1
@@ -117,7 +116,6 @@
     TO_TIMESTAMP('2024 !! 02 !! 14 11 !! 45 !! 49', 'yyyy//MM//dd') AS comp
   expect-error: 'Unparseable date: "2024 !! 02 !! 14 11 !! 45 !! 49"'
 - do: Format DateType and TimeType
-  ignore: FLINK-38906
   projection: |-
     id_, date_, time_0_, time_6_
     DATE_FORMAT(date_, 'yyyy->MM->dd') AS date_fmt_1
@@ -134,7 +132,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null, 
null, null, null, null, null, null, null], op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null, null, 
null, null, null, null, null], after=[], op=DELETE, meta=()}
 - do: From UnixTime Function
-  ignore: FLINK-38906
   projection: |-
     FROM_UNIXTIME(bigint_) AS col_1
     FROM_UNIXTIME(bigint_, 'yyyy/MM/dd HH;mm;ss') AS col_2
@@ -146,7 +143,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[null, null], after=[], 
op=DELETE, meta=()}
 - do: From UnixTime Function with Implicit Conversion
-  ignore: FLINK-38906
   projection: |-
     FROM_UNIXTIME(int_) AS col_1
     FROM_UNIXTIME(int_, 'yyyy/MM/dd HH;mm;ss') AS col_2
@@ -158,7 +154,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[null, null], after=[], 
op=DELETE, meta=()}
 - do: To Timestamp LTZ Function
-  ignore: FLINK-38906
   projection: |-
     TO_TIMESTAMP_LTZ(bigint_) AS col_1
     TO_TIMESTAMP_LTZ(bigint_, 0) AS col_2
@@ -174,7 +169,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null, 
1999-12-31T23:50:23.456, 2000-03-29T19:21:48.321, 2001-11-23T02:48:25.999], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, 
1999-12-31T23:50:23.456, 2000-03-29T19:21:48.321, 2001-11-23T02:48:25.999], 
after=[], op=DELETE, meta=()}
 - do: Formatting TIMESTAMP(0) with Timezone
-  ignore: FLINK-38906
   projection: |-
     DATE_FORMAT_TZ(timestamp_0_, 'Asia/Shanghai') AS col_1
     DATE_FORMAT_TZ(timestamp_0_, 'America/Los_Angeles') AS col_2
@@ -190,7 +184,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null, 
null, null, null], op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null, 
null], after=[], op=DELETE, meta=()}
 - do: Formatting TIMESTAMP(6) with Timezone
-  ignore: FLINK-38906
   projection: |-
     DATE_FORMAT_TZ(timestamp_6_, 'Asia/Shanghai') AS col_1
     DATE_FORMAT_TZ(timestamp_6_, 'America/Los_Angeles') AS col_2
@@ -206,7 +199,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null, 
null, null, null], op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null, 
null], after=[], op=DELETE, meta=()}
 - do: Formatting TIMESTAMP(9) with Timezone
-  ignore: FLINK-38906
   projection: |-
     DATE_FORMAT_TZ(timestamp_9_, 'Asia/Shanghai') AS col_1
     DATE_FORMAT_TZ(timestamp_9_, 'America/Los_Angeles') AS col_2
@@ -222,7 +214,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null, 
null, null, null], op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null, 
null], after=[], op=DELETE, meta=()}
 - do: Formatting TIMESTAMP_LTZ(0) with Timezone
-  ignore: FLINK-38906
   projection: |-
     DATE_FORMAT_TZ(timestamp_ltz_0_, 'Asia/Shanghai') AS col_1
     DATE_FORMAT_TZ(timestamp_ltz_0_, 'America/Los_Angeles') AS col_2
@@ -238,7 +229,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null, 
null, null, null], op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null, 
null], after=[], op=DELETE, meta=()}
 - do: Formatting TIMESTAMP_LTZ(6) with Timezone
-  ignore: FLINK-38906
   projection: |-
     DATE_FORMAT_TZ(timestamp_ltz_6_, 'Asia/Shanghai') AS col_1
     DATE_FORMAT_TZ(timestamp_ltz_6_, 'America/Los_Angeles') AS col_2
@@ -254,7 +244,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null, 
null, null, null], op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null, 
null], after=[], op=DELETE, meta=()}
 - do: Formatting TIMESTAMP_LTZ(9) with Timezone
-  ignore: FLINK-38906
   projection: |-
     DATE_FORMAT_TZ(timestamp_ltz_9_, 'Asia/Shanghai') AS col_1
     DATE_FORMAT_TZ(timestamp_ltz_9_, 'America/Los_Angeles') AS col_2
@@ -270,7 +259,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[null, null, null, 
null, null, null], op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[null, null, null, null, null, 
null], after=[], op=DELETE, meta=()}
 - do: DATE_ADD Function
-  ignore: FLINK-38906
   projection: |-
     DATE_ADD(timestamp_0_, 17) AS col_1
     DATE_ADD(timestamp_6_, 17) AS col_2
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/ComparisonFunctions.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/ComparisonFunctions.java
index be2df0e1f..41605ca8b 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/ComparisonFunctions.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/ComparisonFunctions.java
@@ -78,6 +78,13 @@ public class ComparisonFunctions {
         return value.compareTo(minValue) >= 0 && value.compareTo(maxValue) <= 
0;
     }
 
+    public static boolean betweenAsymmetric(Byte value, byte minValue, byte 
maxValue) {
+        if (value == null) {
+            return false;
+        }
+        return value >= minValue && value <= maxValue;
+    }
+
     public static boolean betweenAsymmetric(Short value, short minValue, short 
maxValue) {
         if (value == null) {
             return false;
@@ -125,6 +132,10 @@ public class ComparisonFunctions {
         return !betweenAsymmetric(value, minValue, maxValue);
     }
 
+    public static boolean notBetweenAsymmetric(Byte value, byte minValue, byte 
maxValue) {
+        return !betweenAsymmetric(value, minValue, maxValue);
+    }
+
     public static boolean notBetweenAsymmetric(Short value, short minValue, 
short maxValue) {
         return !betweenAsymmetric(value, minValue, maxValue);
     }
@@ -151,37 +162,45 @@ public class ComparisonFunctions {
     }
 
     public static boolean in(String value, String... str) {
-        return Arrays.stream(str).anyMatch(item -> value.equals(item));
+        return Arrays.asList(str).contains(value);
+    }
+
+    public static boolean in(Byte value, Byte... values) {
+        return Arrays.asList(values).contains(value);
     }
 
     public static boolean in(Short value, Short... values) {
-        return Arrays.stream(values).anyMatch(item -> value.equals(item));
+        return Arrays.asList(values).contains(value);
     }
 
     public static boolean in(Integer value, Integer... values) {
-        return Arrays.stream(values).anyMatch(item -> value.equals(item));
+        return Arrays.asList(values).contains(value);
     }
 
     public static boolean in(Long value, Long... values) {
-        return Arrays.stream(values).anyMatch(item -> value.equals(item));
+        return Arrays.asList(values).contains(value);
     }
 
     public static boolean in(Float value, Float... values) {
-        return Arrays.stream(values).anyMatch(item -> value.equals(item));
+        return Arrays.asList(values).contains(value);
     }
 
     public static boolean in(Double value, Double... values) {
-        return Arrays.stream(values).anyMatch(item -> value.equals(item));
+        return Arrays.asList(values).contains(value);
     }
 
     public static boolean in(BigDecimal value, BigDecimal... values) {
-        return Arrays.stream(values).anyMatch(item -> value.equals(item));
+        return Arrays.asList(values).contains(value);
     }
 
     public static boolean notIn(String value, String... values) {
         return !in(value, values);
     }
 
+    public static boolean notIn(Byte value, Byte... values) {
+        return !in(value, values);
+    }
+
     public static boolean notIn(Short value, Short... values) {
         return !in(value, values);
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index 4adc1f2a9..ae0b48d57 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -487,7 +487,7 @@ public class PostTransformOperator extends 
AbstractStreamOperator<Event>
                     new PostTransformer(
                             selectors,
                             TransformProjection.of(projection).orElse(null),
-                            TransformFilter.of(filterExpression, 
udfDescriptors).orElse(null),
+                            TransformFilter.of(filterExpression).orElse(null),
                             
PostTransformConverters.of(rule.getPostTransformConverter())
                                     .orElse(null),
                             rule.getSupportedMetadataColumns());
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index 61af9e370..ff6cc512f 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -116,7 +116,7 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
                     new PreTransformer(
                             selectors,
                             TransformProjection.of(projection).orElse(null),
-                            TransformFilter.of(filter, 
udfDescriptors).orElse(null)));
+                            TransformFilter.of(filter).orElse(null)));
             schemaMetadataTransformers.add(
                     new Tuple2<>(
                             selectors,
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
index f2cfc4256..db5c37ca2 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
@@ -131,7 +131,6 @@ public class ProjectionColumnProcessor {
         List<Class<?>> paramTypes = new ArrayList<>();
 
         List<Column> columns = 
tableInfo.getPreTransformedSchema().getColumns();
-        String scriptExpression = projectionColumn.getScriptExpression();
         Map<String, String> columnNameMap = 
projectionColumn.getColumnNameMap();
         LinkedHashSet<String> originalColumnNames =
                 new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
@@ -171,7 +170,7 @@ public class ProjectionColumnProcessor {
 
         return TransformExpressionKey.of(
                 projectionColumn.getExpression(),
-                JaninoCompiler.loadSystemFunction(scriptExpression),
+                projectionColumn.getScriptExpression(),
                 argumentNames,
                 paramTypes,
                 JavaClassConverter.toJavaClass(projectionColumn.getDataType()),
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
index bc496faf9..1085a7df8 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
 
 import org.codehaus.commons.compiler.CompileException;
 import org.codehaus.janino.ExpressionEvaluator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -36,6 +38,8 @@ import java.util.List;
  */
 public class TransformExpressionCompiler {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(TransformExpressionCompiler.class);
+
     static final Cache<TransformExpressionKey, ExpressionEvaluator> 
COMPILED_EXPRESSION_CACHE =
             CacheBuilder.newBuilder().softValues().build();
 
@@ -71,9 +75,17 @@ public class TransformExpressionCompiler {
 
                         // Result type
                         
expressionEvaluator.setExpressionType(key.getReturnClass());
+
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Going to evaluate expression: {}", 
key.getFullExpression());
+                            LOG.debug("  - Argument names: {}", argumentNames);
+                            LOG.debug("  - Argument types: {}", 
argumentClasses);
+                            LOG.debug("  - Returns: {}", key.getReturnClass());
+                        }
+
                         try {
                             // Compile
-                            expressionEvaluator.cook(key.getExpression());
+                            expressionEvaluator.cook(key.getFullExpression());
                         } catch (CompileException e) {
                             throw new InvalidProgramException(
                                     String.format(
@@ -82,7 +94,7 @@ public class TransformExpressionCompiler {
                                                     + "\tCompiled expression: 
%s\n"
                                                     + "\tColumn name map: 
{%s}",
                                             key.getOriginalExpression(),
-                                            key.getExpression(),
+                                            key.getCompiledExpression(),
                                             
TransformException.prettyPrintColumnNameMap(
                                                     key.getColumnNameMap())),
                                     e);
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
index 75cb13b66..5a15662e5 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.cdc.runtime.operators.transform;
 
+import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
+
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
@@ -43,7 +45,7 @@ import java.util.Objects;
 public class TransformExpressionKey implements Serializable {
     private static final long serialVersionUID = 1L;
     @Nullable private final String originalExpression;
-    private final String expression;
+    private final String compiledExpression;
     private final List<String> argumentNames;
     private final List<Class<?>> argumentClasses;
     private final Class<?> returnClass;
@@ -51,13 +53,13 @@ public class TransformExpressionKey implements Serializable 
{
 
     private TransformExpressionKey(
             @Nullable String originalExpression,
-            String expression,
+            String compiledExpression,
             List<String> argumentNames,
             List<Class<?>> argumentClasses,
             Class<?> returnClass,
             Map<String, String> columnNameMap) {
         this.originalExpression = originalExpression;
-        this.expression = expression;
+        this.compiledExpression = compiledExpression;
         this.argumentNames = argumentNames;
         this.argumentClasses = argumentClasses;
         this.returnClass = returnClass;
@@ -69,8 +71,12 @@ public class TransformExpressionKey implements Serializable {
         return originalExpression;
     }
 
-    public String getExpression() {
-        return expression;
+    public String getCompiledExpression() {
+        return compiledExpression;
+    }
+
+    public String getFullExpression() {
+        return JaninoCompiler.loadSystemFunction(compiledExpression);
     }
 
     public List<String> getArgumentNames() {
@@ -91,14 +97,14 @@ public class TransformExpressionKey implements Serializable 
{
 
     public static TransformExpressionKey of(
             @Nullable String originalExpression,
-            String expression,
+            String compiledExpression,
             List<String> argumentNames,
             List<Class<?>> argumentClasses,
             Class<?> returnClass,
             Map<String, String> columnNameMap) {
         return new TransformExpressionKey(
                 originalExpression,
-                expression,
+                compiledExpression,
                 argumentNames,
                 argumentClasses,
                 returnClass,
@@ -115,7 +121,7 @@ public class TransformExpressionKey implements Serializable 
{
         }
         TransformExpressionKey that = (TransformExpressionKey) o;
         return Objects.equals(originalExpression, that.originalExpression)
-                && expression.equals(that.expression)
+                && compiledExpression.equals(that.compiledExpression)
                 && argumentNames.equals(that.argumentNames)
                 && argumentClasses.equals(that.argumentClasses)
                 && returnClass.equals(that.returnClass)
@@ -126,7 +132,7 @@ public class TransformExpressionKey implements Serializable 
{
     public int hashCode() {
         return Objects.hash(
                 originalExpression,
-                expression,
+                compiledExpression,
                 argumentNames,
                 argumentClasses,
                 returnClass,
@@ -139,8 +145,8 @@ public class TransformExpressionKey implements Serializable 
{
                 + "originalExpression='"
                 + originalExpression
                 + '\''
-                + ", expression='"
-                + expression
+                + ", compiledExpression='"
+                + compiledExpression
                 + '\''
                 + ", argumentNames="
                 + argumentNames
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
index 3feaa0822..a9ea00dd0 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
@@ -41,17 +41,12 @@ import java.util.Optional;
 public class TransformFilter implements Serializable {
     private static final long serialVersionUID = 1L;
     private final String expression;
-    private final String scriptExpression;
     private final List<String> columnNames;
     private final Map<String, String> columnNameMap;
 
     public TransformFilter(
-            String expression,
-            String scriptExpression,
-            List<String> columnNames,
-            Map<String, String> columnNameMap) {
+            String expression, List<String> columnNames, Map<String, String> 
columnNameMap) {
         this.expression = expression;
-        this.scriptExpression = scriptExpression;
         this.columnNames = columnNames;
         this.columnNameMap = columnNameMap;
     }
@@ -60,10 +55,6 @@ public class TransformFilter implements Serializable {
         return expression;
     }
 
-    public String getScriptExpression() {
-        return scriptExpression;
-    }
-
     public List<String> getColumnNames() {
         return columnNames;
     }
@@ -76,19 +67,13 @@ public class TransformFilter implements Serializable {
         return TransformException.prettyPrintColumnNameMap(getColumnNameMap());
     }
 
-    public static Optional<TransformFilter> of(
-            String filterExpression, List<UserDefinedFunctionDescriptor> 
udfDescriptors) {
+    public static Optional<TransformFilter> of(String filterExpression) {
         if (StringUtils.isNullOrWhitespaceOnly(filterExpression)) {
             return Optional.empty();
         }
         List<String> columnNames = 
TransformParser.parseFilterColumnNameList(filterExpression);
         Map<String, String> columnNameMap = 
TransformParser.generateColumnNameMap(columnNames);
-        String scriptExpression =
-                TransformParser.translateFilterExpressionToJaninoExpression(
-                        filterExpression, udfDescriptors, columnNameMap);
-        return Optional.of(
-                new TransformFilter(
-                        filterExpression, scriptExpression, columnNames, 
columnNameMap));
+        return Optional.of(new TransformFilter(filterExpression, columnNames, 
columnNameMap));
     }
 
     public boolean isValid() {
@@ -101,9 +86,6 @@ public class TransformFilter implements Serializable {
                 + "expression='"
                 + expression
                 + '\''
-                + ", scriptExpression='"
-                + scriptExpression
-                + '\''
                 + ", columnNames="
                 + columnNames
                 + ", columnNameMap="
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
index 299266d1b..77dc5cfe6 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.cdc.common.converter.JavaClassConverter;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
 import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
+import org.apache.flink.cdc.runtime.parser.TransformParser;
 
 import org.codehaus.janino.ExpressionEvaluator;
 
@@ -69,7 +70,13 @@ public class TransformFilterProcessor {
             this.transformExpressionKey = null;
             this.expressionEvaluator = null;
         } else {
-            this.transformExpressionKey = generateTransformExpressionKey();
+            this.transformExpressionKey =
+                    generateTransformExpressionKey(
+                            tableInfo.getPreTransformedSchema().getColumns(),
+                            udfDescriptors,
+                            supportedMetadataColumns
+                                    .values()
+                                    .toArray(new SupportedMetadataColumn[0]));
             this.expressionEvaluator =
                     TransformExpressionCompiler.compileExpression(
                             transformExpressionKey, udfDescriptors);
@@ -117,8 +124,12 @@ public class TransformFilterProcessor {
                                     + "\tCompiled expression: %s\n"
                                     + "\tColumn name map: {%s}",
                             tableInfo.getName(),
-                            transformFilter.getExpression(),
-                            transformFilter.getScriptExpression(),
+                            transformExpressionKey != null
+                                    ? 
transformExpressionKey.getOriginalExpression()
+                                    : "<no op>",
+                            transformExpressionKey != null
+                                    ? 
transformExpressionKey.getCompiledExpression()
+                                    : "<no op>",
                             transformFilter.getColumnNameMapAsString()),
                     e);
         }
@@ -200,7 +211,10 @@ public class TransformFilterProcessor {
         return params.toArray();
     }
 
-    private TransformExpressionKey generateTransformExpressionKey() {
+    private TransformExpressionKey generateTransformExpressionKey(
+            List<Column> columns,
+            List<UserDefinedFunctionDescriptor> udfDescriptors,
+            SupportedMetadataColumn[] supportedMetadataColumns) {
         Tuple2<List<String>, List<Class<?>>> args = generateArguments(true);
 
         args.f0.add(JaninoCompiler.DEFAULT_TIME_ZONE);
@@ -208,9 +222,17 @@ public class TransformFilterProcessor {
         args.f0.add(JaninoCompiler.DEFAULT_EPOCH_TIME);
         args.f1.add(Long.class);
 
+        String scriptExpression =
+                TransformParser.translateFilterExpressionToJaninoExpression(
+                        transformFilter.getExpression(),
+                        columns,
+                        udfDescriptors,
+                        supportedMetadataColumns,
+                        transformFilter.getColumnNameMap());
+
         return TransformExpressionKey.of(
                 transformFilter.getExpression(),
-                
JaninoCompiler.loadSystemFunction(transformFilter.getScriptExpression()),
+                scriptExpression,
                 args.f0,
                 args.f1,
                 Boolean.class,
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index b7fcfa0ea..4345331ac 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -21,6 +21,10 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.io.ParseException;
 import org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import org.apache.flink.cdc.common.converter.JavaClassConverter;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypeRoot;
 import org.apache.flink.cdc.common.utils.StringUtils;
 import 
org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
 
@@ -78,7 +82,8 @@ public class JaninoCompiler {
                     "TIMESTAMPADD",
                     "TIMESTAMPDIFF",
                     "TIMESTAMP_DIFF",
-                    "DATE_FORMAT");
+                    "DATE_FORMAT",
+                    "DATE_ADD");
 
     public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
     public static final String DEFAULT_TIME_ZONE = "__time_zone__";
@@ -121,54 +126,47 @@ public class JaninoCompiler {
         }
     }
 
-    public static String translateSqlNodeToJaninoExpression(
-            SqlNode transform,
-            List<UserDefinedFunctionDescriptor> udfDescriptors,
-            Map<String, String> columnNameMap) {
-        Java.Rvalue rvalue =
-                translateSqlNodeToJaninoRvalue(transform, udfDescriptors, 
columnNameMap);
+    public static String translateSqlNodeToJaninoExpression(Context context, 
SqlNode transform) {
+        Java.Rvalue rvalue = translateSqlNodeToJaninoRvalue(context, 
transform);
         if (rvalue != null) {
             return rvalue.toString();
         }
         return "";
     }
 
-    public static Java.Rvalue translateSqlNodeToJaninoRvalue(
-            SqlNode transform,
-            List<UserDefinedFunctionDescriptor> udfDescriptors,
-            Map<String, String> columnNameMap) {
+    public static Java.Rvalue translateSqlNodeToJaninoRvalue(Context context, 
SqlNode transform) {
         if (transform instanceof SqlIdentifier) {
-            return translateSqlIdentifier((SqlIdentifier) transform, 
columnNameMap);
+            return translateSqlIdentifier(context, (SqlIdentifier) transform);
         } else if (transform instanceof SqlBasicCall) {
-            return translateSqlBasicCall((SqlBasicCall) transform, 
udfDescriptors, columnNameMap);
+            return translateSqlBasicCall(context, (SqlBasicCall) transform);
         } else if (transform instanceof SqlCase) {
-            return translateSqlCase((SqlCase) transform, udfDescriptors, 
columnNameMap);
+            return translateSqlCase(context, (SqlCase) transform);
         } else if (transform instanceof SqlLiteral) {
-            return translateSqlSqlLiteral((SqlLiteral) transform);
+            return translateSqlSqlLiteral(context, (SqlLiteral) transform);
         }
         return null;
     }
 
     private static Java.Rvalue translateSqlIdentifier(
-            SqlIdentifier sqlIdentifier, Map<String, String> columnNameMap) {
+            Context context, SqlIdentifier sqlIdentifier) {
         String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() 
- 1);
         if 
(TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName.toUpperCase())) {
-            return generateTimezoneFreeTemporalFunctionOperation(columnName);
+            return generateTimezoneFreeTemporalFunctionOperation(context, 
columnName);
         } else if 
(TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS.contains(columnName.toUpperCase())) {
-            return 
generateTimezoneRequiredTemporalFunctionOperation(columnName);
+            return generateTimezoneRequiredTemporalFunctionOperation(context, 
columnName);
         } else if 
(TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS.contains(columnName.toUpperCase()))
 {
-            return 
generateTimezoneFreeTemporalConversionFunctionOperation(columnName);
+            return 
generateTimezoneFreeTemporalConversionFunctionOperation(context, columnName);
         } else if (TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS.contains(
                 columnName.toUpperCase())) {
-            return 
generateTimezoneRequiredTemporalConversionFunctionOperation(columnName);
+            return 
generateTimezoneRequiredTemporalConversionFunctionOperation(context, 
columnName);
         } else {
             return new Java.AmbiguousName(
                     Location.NOWHERE,
-                    new String[] {columnNameMap.getOrDefault(columnName, 
columnName)});
+                    new String[] 
{context.columnNameMap.getOrDefault(columnName, columnName)});
         }
     }
 
-    private static Java.Rvalue translateSqlSqlLiteral(SqlLiteral sqlLiteral) {
+    private static Java.Rvalue translateSqlSqlLiteral(Context context, 
SqlLiteral sqlLiteral) {
         if (sqlLiteral.getValue() == null) {
             return new Java.NullLiteral(Location.NOWHERE);
         }
@@ -190,14 +188,11 @@ public class JaninoCompiler {
         return new Java.AmbiguousName(Location.NOWHERE, new String[] 
{value.toString()});
     }
 
-    private static Java.Rvalue translateSqlBasicCall(
-            SqlBasicCall sqlBasicCall,
-            List<UserDefinedFunctionDescriptor> udfDescriptors,
-            Map<String, String> columnNameMap) {
+    private static Java.Rvalue translateSqlBasicCall(Context context, 
SqlBasicCall sqlBasicCall) {
         List<SqlNode> operandList = sqlBasicCall.getOperandList();
         List<Java.Rvalue> atoms = new ArrayList<>();
         for (SqlNode sqlNode : operandList) {
-            translateSqlNodeToAtoms(sqlNode, atoms, udfDescriptors, 
columnNameMap);
+            translateSqlNodeToAtoms(context, sqlNode, atoms);
         }
         if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(
                 sqlBasicCall.getOperator().getName().toUpperCase())) {
@@ -210,27 +205,22 @@ public class JaninoCompiler {
                 sqlBasicCall.getOperator().getName().toUpperCase())) {
             atoms.add(new Java.AmbiguousName(Location.NOWHERE, new String[] 
{DEFAULT_TIME_ZONE}));
         }
-        return sqlBasicCallToJaninoRvalue(
-                sqlBasicCall, atoms.toArray(new Java.Rvalue[0]), 
udfDescriptors);
+        return sqlBasicCallToJaninoRvalue(context, sqlBasicCall, 
atoms.toArray(new Java.Rvalue[0]));
     }
 
-    private static Java.Rvalue translateSqlCase(
-            SqlCase sqlCase,
-            List<UserDefinedFunctionDescriptor> udfDescriptors,
-            Map<String, String> columnNameMap) {
+    private static Java.Rvalue translateSqlCase(Context context, SqlCase 
sqlCase) {
         SqlNodeList whenOperands = sqlCase.getWhenOperands();
         SqlNodeList thenOperands = sqlCase.getThenOperands();
         SqlNode elseOperand = sqlCase.getElseOperand();
         List<Java.Rvalue> whenAtoms = new ArrayList<>();
         for (SqlNode sqlNode : whenOperands) {
-            translateSqlNodeToAtoms(sqlNode, whenAtoms, udfDescriptors, 
columnNameMap);
+            translateSqlNodeToAtoms(context, sqlNode, whenAtoms);
         }
         List<Java.Rvalue> thenAtoms = new ArrayList<>();
         for (SqlNode sqlNode : thenOperands) {
-            translateSqlNodeToAtoms(sqlNode, thenAtoms, udfDescriptors, 
columnNameMap);
+            translateSqlNodeToAtoms(context, sqlNode, thenAtoms);
         }
-        Java.Rvalue elseAtoms =
-                translateSqlNodeToJaninoRvalue(elseOperand, udfDescriptors, 
columnNameMap);
+        Java.Rvalue elseAtoms = translateSqlNodeToJaninoRvalue(context, 
elseOperand);
         Java.Rvalue sqlCaseRvalueTemp = elseAtoms;
         for (int i = whenAtoms.size() - 1; i >= 0; i--) {
             sqlCaseRvalueTemp =
@@ -244,50 +234,46 @@ public class JaninoCompiler {
     }
 
     private static void translateSqlNodeToAtoms(
-            SqlNode sqlNode,
-            List<Java.Rvalue> atoms,
-            List<UserDefinedFunctionDescriptor> udfDescriptors,
-            Map<String, String> columnNameMap) {
+            Context context, SqlNode sqlNode, List<Java.Rvalue> atoms) {
         if (sqlNode instanceof SqlIdentifier) {
-            atoms.add(translateSqlIdentifier((SqlIdentifier) sqlNode, 
columnNameMap));
+            atoms.add(translateSqlIdentifier(context, (SqlIdentifier) 
sqlNode));
         } else if (sqlNode instanceof SqlLiteral) {
-            atoms.add(translateSqlSqlLiteral((SqlLiteral) sqlNode));
+            atoms.add(translateSqlSqlLiteral(context, (SqlLiteral) sqlNode));
         } else if (sqlNode instanceof SqlBasicCall) {
-            atoms.add(translateSqlBasicCall((SqlBasicCall) sqlNode, 
udfDescriptors, columnNameMap));
+            atoms.add(translateSqlBasicCall(context, (SqlBasicCall) sqlNode));
         } else if (sqlNode instanceof SqlNodeList) {
             for (SqlNode node : (SqlNodeList) sqlNode) {
-                translateSqlNodeToAtoms(node, atoms, udfDescriptors, 
columnNameMap);
+                translateSqlNodeToAtoms(context, node, atoms);
             }
         } else if (sqlNode instanceof SqlCase) {
-            atoms.add(translateSqlCase((SqlCase) sqlNode, udfDescriptors, 
columnNameMap));
+            atoms.add(translateSqlCase(context, (SqlCase) sqlNode));
         }
     }
 
     private static Java.Rvalue sqlBasicCallToJaninoRvalue(
-            SqlBasicCall sqlBasicCall,
-            Java.Rvalue[] atoms,
-            List<UserDefinedFunctionDescriptor> udfDescriptors) {
+            Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
         switch (sqlBasicCall.getKind()) {
             case AND:
-                return generateBinaryOperation(sqlBasicCall, atoms, "&&");
+                return generateBinaryOperation(context, sqlBasicCall, atoms, 
"&&");
             case OR:
-                return generateBinaryOperation(sqlBasicCall, atoms, "||");
+                return generateBinaryOperation(context, sqlBasicCall, atoms, 
"||");
             case NOT:
-                return generateUnaryOperation("!", atoms[0]);
+                return generateUnaryOperation(context, "!", atoms[0]);
             case EQUALS:
-                return generateEqualsOperation(sqlBasicCall, atoms);
+                return generateEqualsOperation(context, sqlBasicCall, atoms);
             case NOT_EQUALS:
-                return generateUnaryOperation("!", 
generateEqualsOperation(sqlBasicCall, atoms));
+                return generateUnaryOperation(
+                        context, "!", generateEqualsOperation(context, 
sqlBasicCall, atoms));
             case IS_NULL:
-                return generateUnaryOperation("null == ", atoms[0]);
+                return generateUnaryOperation(context, "null == ", atoms[0]);
             case IS_NOT_NULL:
-                return generateUnaryOperation("null != ", atoms[0]);
+                return generateUnaryOperation(context, "null != ", atoms[0]);
             case IS_FALSE:
             case IS_NOT_TRUE:
-                return generateUnaryOperation("false == ", atoms[0]);
+                return generateUnaryOperation(context, "false == ", atoms[0]);
             case IS_TRUE:
             case IS_NOT_FALSE:
-                return generateUnaryOperation("true == ", atoms[0]);
+                return generateUnaryOperation(context, "true == ", atoms[0]);
             case BETWEEN:
             case IN:
             case NOT_IN:
@@ -296,49 +282,69 @@ public class JaninoCompiler {
             case FLOOR:
             case TRIM:
             case OTHER_FUNCTION:
-                return generateOtherFunctionOperation(sqlBasicCall, atoms, 
udfDescriptors);
+                return generateOtherFunctionOperation(context, sqlBasicCall, 
atoms);
             case PLUS:
-                return generateBinaryOperation(sqlBasicCall, atoms, "+");
+                return generateBinaryOperation(context, sqlBasicCall, atoms, 
"+");
             case MINUS:
-                return generateBinaryOperation(sqlBasicCall, atoms, "-");
+                return generateBinaryOperation(context, sqlBasicCall, atoms, 
"-");
             case TIMES:
-                return generateBinaryOperation(sqlBasicCall, atoms, "*");
+                return generateBinaryOperation(context, sqlBasicCall, atoms, 
"*");
             case DIVIDE:
-                return generateBinaryOperation(sqlBasicCall, atoms, "/");
+                return generateBinaryOperation(context, sqlBasicCall, atoms, 
"/");
             case MOD:
-                return generateBinaryOperation(sqlBasicCall, atoms, "%");
+                return generateBinaryOperation(context, sqlBasicCall, atoms, 
"%");
             case LESS_THAN:
             case GREATER_THAN:
             case LESS_THAN_OR_EQUAL:
             case GREATER_THAN_OR_EQUAL:
-                return generateCompareOperation(sqlBasicCall, atoms);
+                return generateCompareOperation(context, sqlBasicCall, atoms);
             case CAST:
-                return generateCastOperation(sqlBasicCall, atoms);
+                return generateCastOperation(context, sqlBasicCall, atoms);
             case TIMESTAMP_DIFF:
-                return generateTimestampDiffOperation(sqlBasicCall, atoms);
+                return generateTimestampDiffOperation(context, sqlBasicCall, 
atoms);
             case TIMESTAMP_ADD:
-                return generateTimestampAddOperation(sqlBasicCall, atoms);
+                return generateTimestampAddOperation(context, sqlBasicCall, 
atoms);
             case OTHER:
-                return generateOtherOperation(sqlBasicCall, atoms);
+                return generateOtherOperation(context, sqlBasicCall, atoms);
             default:
-                throw new ParseException("Unrecognized expression: " + 
sqlBasicCall.toString());
+                throw new ParseException("Unrecognized expression: " + 
sqlBasicCall);
         }
     }
 
-    private static Java.Rvalue generateUnaryOperation(String operator, 
Java.Rvalue atom) {
+    private static Java.Rvalue generateUnaryOperation(
+            Context context, String operator, Java.Rvalue atom) {
         return new Java.UnaryOperation(Location.NOWHERE, operator, atom);
     }
 
+    private static final Map<String, String> decimalArithmeticHandlers =
+            Map.of(
+                    "+", "plus",
+                    "-", "minus",
+                    "*", "times",
+                    "/", "divides");
+
     private static Java.Rvalue generateBinaryOperation(
-            SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms, String operator) {
+            Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms, 
String operator) {
         if (atoms.length != 2) {
             throw new ParseException("Unrecognized expression: " + 
sqlBasicCall.toString());
         }
+        if (decimalArithmeticHandlers.containsKey(operator)) {
+            String handler = decimalArithmeticHandlers.get(operator);
+            DataType resultType =
+                    TransformParser.deduceSubExpressionType(
+                            context.columns,
+                            sqlBasicCall,
+                            context.udfDescriptors,
+                            context.supportedMetadataColumns);
+            if (resultType.is(DataTypeRoot.DECIMAL)) {
+                return new Java.MethodInvocation(Location.NOWHERE, null, 
handler, atoms);
+            }
+        }
         return new Java.BinaryOperation(Location.NOWHERE, atoms[0], operator, 
atoms[1]);
     }
 
     private static Java.Rvalue generateEqualsOperation(
-            SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+            Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
         if (atoms.length != 2) {
             throw new ParseException("Unrecognized expression: " + 
sqlBasicCall.toString());
         }
@@ -347,17 +353,17 @@ public class JaninoCompiler {
     }
 
     private static Java.Rvalue generateCastOperation(
-            SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+            Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
         if (atoms.length != 1) {
             throw new ParseException("Unrecognized expression: " + 
sqlBasicCall.toString());
         }
         List<SqlNode> operandList = sqlBasicCall.getOperandList();
         SqlDataTypeSpec sqlDataTypeSpec = (SqlDataTypeSpec) operandList.get(1);
-        return generateTypeConvertMethod(sqlDataTypeSpec, atoms);
+        return generateTypeConvertMethod(context, sqlDataTypeSpec, atoms);
     }
 
     private static Java.Rvalue generateCompareOperation(
-            SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+            Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
         if (atoms.length != 2) {
             throw new ParseException("Unrecognized expression: " + 
sqlBasicCall.toString());
         }
@@ -385,7 +391,7 @@ public class JaninoCompiler {
     }
 
     private static Java.Rvalue generateTimestampDiffOperation(
-            SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+            Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
         if (atoms.length != 4) {
             throw new ParseException("Unrecognized expression: " + 
sqlBasicCall.toString());
         }
@@ -417,7 +423,7 @@ public class JaninoCompiler {
     }
 
     private static Java.Rvalue generateTimestampAddOperation(
-            SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+            Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
         if (atoms.length != 4) {
             throw new ParseException("Unrecognized expression: " + 
sqlBasicCall.toString());
         }
@@ -448,13 +454,13 @@ public class JaninoCompiler {
                 timestampDiffFunctionParam.toArray(new Java.Rvalue[0]));
     }
 
-    private static Java.Rvalue generateCharLengthOperation(Java.Rvalue[] 
atoms) {
+    private static Java.Rvalue generateCharLengthOperation(Context context, 
Java.Rvalue[] atoms) {
         return new Java.MethodInvocation(
                 Location.NOWHERE, null, 
StringUtils.convertToCamelCase("CHAR_LENGTH"), atoms);
     }
 
     private static Java.Rvalue generateOtherOperation(
-            SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+            Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
         if (sqlBasicCall.getOperator().getName().equals("||")) {
             return new Java.MethodInvocation(
                     Location.NOWHERE, null, 
StringUtils.convertToCamelCase("CONCAT"), atoms);
@@ -463,9 +469,7 @@ public class JaninoCompiler {
     }
 
     private static Java.Rvalue generateOtherFunctionOperation(
-            SqlBasicCall sqlBasicCall,
-            Java.Rvalue[] atoms,
-            List<UserDefinedFunctionDescriptor> udfDescriptors) {
+            Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
         String operationName = 
sqlBasicCall.getOperator().getName().toUpperCase();
         if (operationName.equals("IF")) {
             if (atoms.length == 3) {
@@ -476,7 +480,7 @@ public class JaninoCompiler {
             }
         } else {
             Optional<UserDefinedFunctionDescriptor> udfFunctionOptional =
-                    udfDescriptors.stream()
+                    context.udfDescriptors.stream()
                             .filter(e -> 
e.getName().equalsIgnoreCase(operationName))
                             .findFirst();
             return udfFunctionOptional
@@ -498,7 +502,8 @@ public class JaninoCompiler {
         }
     }
 
-    private static Java.Rvalue 
generateTimezoneFreeTemporalFunctionOperation(String operationName) {
+    private static Java.Rvalue generateTimezoneFreeTemporalFunctionOperation(
+            Context context, String operationName) {
         return new Java.MethodInvocation(
                 Location.NOWHERE,
                 null,
@@ -509,7 +514,7 @@ public class JaninoCompiler {
     }
 
     private static Java.Rvalue 
generateTimezoneRequiredTemporalFunctionOperation(
-            String operationName) {
+            Context context, String operationName) {
         List<Java.Rvalue> timestampFunctionParam = new ArrayList<>();
         timestampFunctionParam.add(
                 new Java.AmbiguousName(Location.NOWHERE, new String[] 
{DEFAULT_EPOCH_TIME}));
@@ -523,7 +528,7 @@ public class JaninoCompiler {
     }
 
     private static Java.Rvalue 
generateTimezoneFreeTemporalConversionFunctionOperation(
-            String operationName) {
+            Context context, String operationName) {
         return new Java.MethodInvocation(
                 Location.NOWHERE,
                 null,
@@ -532,7 +537,7 @@ public class JaninoCompiler {
     }
 
     private static Java.Rvalue 
generateTimezoneRequiredTemporalConversionFunctionOperation(
-            String operationName) {
+            Context context, String operationName) {
         return new Java.MethodInvocation(
                 Location.NOWHERE,
                 null,
@@ -543,7 +548,7 @@ public class JaninoCompiler {
     }
 
     private static Java.Rvalue generateTypeConvertMethod(
-            SqlDataTypeSpec sqlDataTypeSpec, Java.Rvalue[] atoms) {
+            Context context, SqlDataTypeSpec sqlDataTypeSpec, Java.Rvalue[] 
atoms) {
         switch (sqlDataTypeSpec.getTypeName().getSimple().toUpperCase()) {
             case "BOOLEAN":
                 return new Java.MethodInvocation(Location.NOWHERE, null, 
"castToBoolean", atoms);
@@ -614,4 +619,39 @@ public class JaninoCompiler {
             return String.format("__instanceOf%s.eval", 
udfFunction.getClassName());
         }
     }
+
+    /** Contextual information for {@link JaninoCompiler}. */
+    public static class Context {
+
+        // Upstream physical columns
+        public final List<Column> columns;
+
+        // Mangled column name map to $1, $2...
+        public final Map<String, String> columnNameMap;
+
+        // User defined function signatures
+        public final List<UserDefinedFunctionDescriptor> udfDescriptors;
+
+        // Readable metadata columns
+        public final SupportedMetadataColumn[] supportedMetadataColumns;
+
+        private Context(
+                List<Column> columns,
+                Map<String, String> columnNameMap,
+                List<UserDefinedFunctionDescriptor> udfDescriptors,
+                SupportedMetadataColumn[] supportedMetadataColumns) {
+            this.columns = columns;
+            this.columnNameMap = columnNameMap;
+            this.udfDescriptors = udfDescriptors;
+            this.supportedMetadataColumns = supportedMetadataColumns;
+        }
+
+        public static Context of(
+                List<Column> columns,
+                Map<String, String> columnNameMap,
+                List<UserDefinedFunctionDescriptor> udfDescriptors,
+                SupportedMetadataColumn[] supportedMetadataColumns) {
+            return new Context(columns, columnNameMap, udfDescriptors, 
supportedMetadataColumns);
+        }
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
index 3369c482d..5ee715301 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
@@ -348,7 +348,12 @@ public class TransformParser {
                                             relDataType),
                                     exprNode.toString(),
                                     
JaninoCompiler.translateSqlNodeToJaninoExpression(
-                                            exprNode, udfDescriptors, 
columnNameMap),
+                                            JaninoCompiler.Context.of(
+                                                    columns,
+                                                    columnNameMap,
+                                                    udfDescriptors,
+                                                    supportedMetadataColumns),
+                                            exprNode),
                                     originalColumnNames,
                                     columnNameMap);
                 }
@@ -423,7 +428,9 @@ public class TransformParser {
 
     public static String translateFilterExpressionToJaninoExpression(
             String filterExpression,
+            List<Column> columns,
             List<UserDefinedFunctionDescriptor> udfDescriptors,
+            SupportedMetadataColumn[] supportedMetadataColumns,
             Map<String, String> columnNameMap) {
         if (isNullOrWhitespaceOnly(filterExpression)) {
             return "";
@@ -434,7 +441,9 @@ public class TransformParser {
         }
         SqlNode where = sqlSelect.getWhere();
         return JaninoCompiler.translateSqlNodeToJaninoExpression(
-                where, udfDescriptors, columnNameMap);
+                JaninoCompiler.Context.of(
+                        columns, columnNameMap, udfDescriptors, 
supportedMetadataColumns),
+                where);
     }
 
     public static List<String> parseComputedColumnNames(
@@ -597,4 +606,37 @@ public class TransformParser {
         }
         return columnNameMap;
     }
+
+    public static DataType deduceSubExpressionType(
+            List<Column> columns,
+            SqlNode subExpression,
+            List<UserDefinedFunctionDescriptor> udfDescriptors,
+            SupportedMetadataColumn[] supportedMetadataColumns) {
+        SqlSelect sqlSelect =
+                new SqlSelect(
+                        SqlParserPos.QUOTED_ZERO,
+                        SqlNodeList.EMPTY,
+                        SqlNodeList.of(subExpression),
+                        new SqlIdentifier(DEFAULT_TABLE, 
SqlParserPos.QUOTED_ZERO),
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+        RelNode relNode = sqlToRel(columns, sqlSelect, udfDescriptors, 
supportedMetadataColumns);
+        RelDataType[] relDataTypes =
+                relNode.getRowType().getFieldList().stream()
+                        .map(RelDataTypeField::getType)
+                        .toArray(RelDataType[]::new);
+        Preconditions.checkArgument(
+                relDataTypes.length == 1,
+                "RelDataType %s should be unary from SqlNode %s",
+                relDataTypes,
+                sqlSelect);
+        RelDataType expressionType = relDataTypes[0];
+        return 
CalciteDataTypeConverter.convertCalciteRelDataTypeToDataType(expressionType);
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index 106b50524..468566b82 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -241,6 +241,8 @@ public class TransformSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     InferTypes.RETURN_TYPE,
                     OperandTypes.or(
                             OperandTypes.family(SqlTypeFamily.TIMESTAMP, 
SqlTypeFamily.STRING),
+                            OperandTypes.family(SqlTypeFamily.DATE, 
SqlTypeFamily.STRING),
+                            OperandTypes.family(SqlTypeFamily.TIME, 
SqlTypeFamily.STRING),
                             OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.STRING)),
                     SqlFunctionCategory.TIMEDATE);
     public static final SqlFunction TIMESTAMP_DIFF =
@@ -298,6 +300,50 @@ public class TransformSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             OperandTypes.family(SqlTypeFamily.CHARACTER),
                             OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER)),
                     SqlFunctionCategory.TIMEDATE);
+    public static final SqlFunction TO_TIMESTAMP_LTZ =
+            new SqlFunction(
+                    "TO_TIMESTAMP_LTZ",
+                    SqlKind.OTHER_FUNCTION,
+                    ReturnTypes.cascade(
+                            
ReturnTypes.explicit(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3),
+                            SqlTypeTransforms.FORCE_NULLABLE),
+                    null,
+                    OperandTypes.or(
+                            OperandTypes.family(SqlTypeFamily.INTEGER),
+                            OperandTypes.family(SqlTypeFamily.INTEGER, 
SqlTypeFamily.INTEGER),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER),
+                            OperandTypes.family(
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.CHARACTER)),
+                    SqlFunctionCategory.TIMEDATE);
+
+    public static final SqlFunction DATE_FORMAT_TZ =
+            new SqlFunction(
+                    "DATE_FORMAT_TZ",
+                    SqlKind.OTHER_FUNCTION,
+                    TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
+                    null,
+                    OperandTypes.or(
+                            OperandTypes.family(SqlTypeFamily.TIMESTAMP, 
SqlTypeFamily.CHARACTER),
+                            OperandTypes.family(
+                                    SqlTypeFamily.TIMESTAMP,
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.CHARACTER)),
+                    SqlFunctionCategory.TIMEDATE);
+
+    public static final SqlFunction DATE_ADD =
+            new SqlFunction(
+                    "DATE_ADD",
+                    SqlKind.OTHER_FUNCTION,
+                    TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
+                    null,
+                    OperandTypes.or(
+                            OperandTypes.family(SqlTypeFamily.TIMESTAMP, 
SqlTypeFamily.INTEGER),
+                            OperandTypes.family(SqlTypeFamily.DATE, 
SqlTypeFamily.INTEGER),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.INTEGER)),
+                    SqlFunctionCategory.TIMEDATE);
 
     // ---------------------
     // Conditional Functions
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 94acbffaf..3f1d02686 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -398,6 +398,8 @@ class TransformParserTest {
                             
TransformParser.translateFilterExpressionToJaninoExpression(
                                     "TIMESTAMPDIFF(SECONDS, dt1, dt2)",
                                     Collections.emptyList(),
+                                    Collections.emptyList(),
+                                    new SupportedMetadataColumn[0],
                                     Collections.emptyMap());
                         })
                 .isExactlyInstanceOf(ParseException.class)
@@ -407,6 +409,8 @@ class TransformParserTest {
                             
TransformParser.translateFilterExpressionToJaninoExpression(
                                     "TIMESTAMPDIFF(QUARTER, dt1, dt2)",
                                     Collections.emptyList(),
+                                    Collections.emptyList(),
+                                    new SupportedMetadataColumn[0],
                                     Collections.emptyMap());
                         })
                 .isExactlyInstanceOf(ParseException.class)
@@ -417,6 +421,8 @@ class TransformParserTest {
                             
TransformParser.translateFilterExpressionToJaninoExpression(
                                     "TIMESTAMPADD(SECONDS, dt1, dt2)",
                                     Collections.emptyList(),
+                                    Collections.emptyList(),
+                                    new SupportedMetadataColumn[0],
                                     Collections.emptyMap());
                         })
                 .isExactlyInstanceOf(ParseException.class)
@@ -426,6 +432,8 @@ class TransformParserTest {
                             
TransformParser.translateFilterExpressionToJaninoExpression(
                                     "TIMESTAMPADD(QUARTER, dt1, dt2)",
                                     Collections.emptyList(),
+                                    Collections.emptyList(),
+                                    new SupportedMetadataColumn[0],
                                     Collections.emptyMap());
                         })
                 .isExactlyInstanceOf(ParseException.class)
@@ -622,6 +630,12 @@ class TransformParserTest {
 
     @Test
     public void testTranslateUdfFilterToJaninoExpressionWithColumnNameMap() {
+        List<Column> columns =
+                List.of(
+                        Column.physicalColumn("a", DataTypes.INT()),
+                        Column.physicalColumn("b", DataTypes.INT()),
+                        Column.physicalColumn("a-b", DataTypes.INT()));
+
         Map<String, String> columnNameMap = new HashMap<>();
         columnNameMap.put("a", "$0");
         columnNameMap.put("b", "$1");
@@ -630,42 +644,52 @@ class TransformParserTest {
         testFilterExpressionWithUdf(
                 "format(upper(a))",
                 "__instanceOfFormatFunctionClass.eval(upper($0))",
+                columns,
                 columnNameMap);
         testFilterExpressionWithUdf(
                 "format(lower(b))",
                 "__instanceOfFormatFunctionClass.eval(lower($1))",
+                columns,
                 columnNameMap);
         testFilterExpressionWithUdf(
                 "format(concat(a,b))",
                 "__instanceOfFormatFunctionClass.eval(concat($0, $1))",
+                columns,
                 columnNameMap);
         testFilterExpressionWithUdf(
                 "format(SUBSTR(`a-b`,1))",
                 "__instanceOfFormatFunctionClass.eval(substr($2, 1))",
+                columns,
                 columnNameMap);
         testFilterExpressionWithUdf(
                 "typeof(`a-b` like '^[a-zA-Z]')",
                 "__instanceOfTypeOfFunctionClass.eval(like($2, 
\"^[a-zA-Z]\"))",
+                columns,
                 columnNameMap);
         testFilterExpressionWithUdf(
                 "typeof(`a-b` not like '^[a-zA-Z]')",
                 "__instanceOfTypeOfFunctionClass.eval(notLike($2, 
\"^[a-zA-Z]\"))",
+                columns,
                 columnNameMap);
         testFilterExpressionWithUdf(
                 "typeof(a-b-`a-b`)",
                 "__instanceOfTypeOfFunctionClass.eval($0 - $1 - $2)",
+                columns,
                 columnNameMap);
         testFilterExpressionWithUdf(
                 "typeof(a-b-2)",
                 "__instanceOfTypeOfFunctionClass.eval($0 - $1 - 2)",
+                columns,
                 columnNameMap);
         testFilterExpressionWithUdf(
                 "addone(addone(`a-b`)) > 4 OR typeof(a-b) <> 'bool' AND 
format('from %s to %s is %s', 'a', 'z', 'lie') <> ''",
                 
"greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval($2)),
 4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval($0 - $1), \"bool\") && 
!valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", 
\"a\", \"z\", \"lie\"), \"\")",
+                columns,
                 columnNameMap);
         testFilterExpressionWithUdf(
                 "ADDONE(ADDONE(`a-b`)) > 4 OR TYPEOF(a-b) <> 'bool' AND 
FORMAT('from %s to %s is %s', 'a', 'z', 'lie') <> ''",
                 
"greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval($2)),
 4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval($0 - $1), \"bool\") && 
!valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", 
\"a\", \"z\", \"lie\"), \"\")",
+                columns,
                 columnNameMap);
     }
 
@@ -687,6 +711,8 @@ class TransformParserTest {
                                 
TransformParser.translateFilterExpressionToJaninoExpression(
                                         "id > 9223372036854775808",
                                         Collections.emptyList(),
+                                        Collections.emptyList(),
+                                        new SupportedMetadataColumn[0],
                                         Collections.emptyMap()))
                 .isExactlyInstanceOf(CalciteContextException.class)
                 .hasMessageContaining("Numeric literal '9223372036854775808' 
out of range");
@@ -696,6 +722,8 @@ class TransformParserTest {
                                 
TransformParser.translateFilterExpressionToJaninoExpression(
                                         "id < -9223372036854775809",
                                         Collections.emptyList(),
+                                        Collections.emptyList(),
+                                        new SupportedMetadataColumn[0],
                                         Collections.emptyMap()))
                 .isExactlyInstanceOf(CalciteContextException.class)
                 .hasMessageContaining("Numeric literal '-9223372036854775809' 
out of range");
@@ -776,22 +804,34 @@ class TransformParserTest {
         }
     }
 
+    private static final List<Column> DUMMY_COLUMNS =
+            List.of(Column.physicalColumn("id", DataTypes.INT()));
+
     private void testFilterExpression(String expression, String 
expressionExpect) {
         String janinoExpression =
                 TransformParser.translateFilterExpressionToJaninoExpression(
-                        expression, Collections.emptyList(), 
Collections.emptyMap());
+                        expression,
+                        DUMMY_COLUMNS,
+                        Collections.emptyList(),
+                        new SupportedMetadataColumn[0],
+                        Collections.emptyMap());
         Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect);
     }
 
     private void testFilterExpressionWithUdf(String expression, String 
expressionExpect) {
-        testFilterExpressionWithUdf(expression, expressionExpect, 
Collections.emptyMap());
+        testFilterExpressionWithUdf(
+                expression, expressionExpect, DUMMY_COLUMNS, 
Collections.emptyMap());
     }
 
     private void testFilterExpressionWithUdf(
-            String expression, String expressionExpect, Map<String, String> 
columnNameMap) {
+            String expression,
+            String expressionExpect,
+            List<Column> columns,
+            Map<String, String> columnNameMap) {
         String janinoExpression =
                 TransformParser.translateFilterExpressionToJaninoExpression(
                         expression,
+                        columns,
                         Arrays.asList(
                                 new UserDefinedFunctionDescriptor(
                                         "format",
@@ -802,6 +842,7 @@ class TransformParserTest {
                                 new UserDefinedFunctionDescriptor(
                                         "typeof",
                                         
"org.apache.flink.cdc.udf.examples.java.TypeOfFunctionClass")),
+                        new SupportedMetadataColumn[0],
                         columnNameMap);
         Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect);
     }

Reply via email to