This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fd03e683 [FLINK-36865][cdc] Provide UNIX_TIMESTAMP series functions 
in YAML pipeline
2fd03e683 is described below

commit 2fd03e683e1e2f1a8c7790e6bb1a168bc1ab9266
Author: Wink <809097...@qq.com>
AuthorDate: Mon Jan 13 16:18:15 2025 +0800

    [FLINK-36865][cdc] Provide UNIX_TIMESTAMP series functions in YAML pipeline
    
    This closes #3819.
---
 docs/content.zh/docs/core-concept/transform.md     |   3 +
 docs/content/docs/core-concept/transform.md        |   3 +
 .../flink/cdc/common/utils/DateTimeUtils.java      |  62 +++-
 .../cdc/runtime/functions/SystemFunctionUtils.java |  21 ++
 .../flink/cdc/runtime/parser/JaninoCompiler.java   |   9 +-
 .../parser/metadata/TransformSqlOperatorTable.java |  21 ++
 .../transform/PostTransformOperatorTest.java       | 350 +++++++++++++++++++++
 .../cdc/runtime/parser/TransformParserTest.java    |  11 +
 8 files changed, 477 insertions(+), 3 deletions(-)

diff --git a/docs/content.zh/docs/core-concept/transform.md 
b/docs/content.zh/docs/core-concept/transform.md
index 892243628..dfa90a728 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to 
parse expressions and [
 | TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | 
timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) 
number of timepointunit between timepoint1 and timepoint2. The unit for the 
interval is given by the first argument, which should be one of the following 
values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
 | TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date 
string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
 | TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | 
Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd 
HH:mm:ss') to a timestamp, without time zone. |
+| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns 
a representation of the numeric argument as a value in string format (default 
is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing 
seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the 
UNIX_TIMESTAMP() function. The return value is expressed in the session time 
zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 
00:00:44’ if in UTC time zone, but re [...]
+| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. 
This function is not deterministic which means the value would be recalculated 
for each record. |
+| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | 
Converts a date time string string1 with format string2 (by default: yyyy-MM-dd 
HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified 
timezone in table config.<br/>If a time zone is specified in the date time 
string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this 
function will use the specified timezone in the date time string instead of the 
timezone in table config. I [...]
 
 ## Conditional Functions
 
diff --git a/docs/content/docs/core-concept/transform.md 
b/docs/content/docs/core-concept/transform.md
index b4979beca..67eeaf2d8 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to 
parse expressions and [
 | TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | 
timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) 
number of timepointunit between timepoint1 and timepoint2. The unit for the 
interval is given by the first argument, which should be one of the following 
values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. |
 | TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date 
string string1 with format string2 (by default 'yyyy-MM-dd') to a date. |
 | TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | 
Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd 
HH:mm:ss') to a timestamp, without time zone. |
+| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns 
a representation of the numeric argument as a value in string format (default 
is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing 
seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the 
UNIX_TIMESTAMP() function. The return value is expressed in the session time 
zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 
00:00:44’ if in UTC time zone, but re [...]
+| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. 
This function is not deterministic which means the value would be recalculated 
for each record. |
+| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | 
Converts a date time string string1 with format string2 (by default: yyyy-MM-dd 
HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified 
timezone in table config.<br/>If a time zone is specified in the date time 
string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this 
function will use the specified timezone in the date time string instead of the 
timezone in table config. I [...]
 
 ## Conditional Functions
 
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
index b923107e2..1fb080de9 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java
@@ -43,6 +43,16 @@ public class DateTimeUtils {
      */
     public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 
1000
 
+    /** The SimpleDateFormat string for ISO dates, "yyyy-MM-dd". */
+    private static final String DATE_FORMAT_STRING = "yyyy-MM-dd";
+
+    /** The SimpleDateFormat string for ISO times, "HH:mm:ss". */
+    private static final String TIME_FORMAT_STRING = "HH:mm:ss";
+
+    /** The SimpleDateFormat string for ISO timestamps, "yyyy-MM-dd HH:mm:ss". 
*/
+    private static final String TIMESTAMP_FORMAT_STRING =
+            DATE_FORMAT_STRING + " " + TIME_FORMAT_STRING;
+
     /**
      * A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat 
is not thread-safe.
      * (string_format) => formatter
@@ -109,7 +119,7 @@ public class DateTimeUtils {
         } catch (ParseException e) {
             LOG.error(
                     String.format(
-                            "Exception when parsing datetime string '%s' in 
format '%s'",
+                            "Exception when parsing datetime string '%s' in 
format '%s', the default value Long.MIN_VALUE(-9223372036854775808) will be 
returned.",
                             dateStr, format),
                     e);
             return Long.MIN_VALUE;
@@ -128,6 +138,56 @@ public class DateTimeUtils {
         return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 
32045;
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    // UNIX TIME
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to 
datetime string in the
+     * "yyyy-MM-dd HH:mm:ss" format.
+     */
+    public static String formatUnixTimestamp(long unixTime, TimeZone timeZone) 
{
+        return formatUnixTimestamp(unixTime, TIMESTAMP_FORMAT_STRING, 
timeZone);
+    }
+
+    /**
+     * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to 
datetime string in the
+     * given format.
+     */
+    public static String formatUnixTimestamp(long unixTime, String format, 
TimeZone timeZone) {
+        SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
+        formatter.setTimeZone(timeZone);
+        Date date = new Date(unixTime * 1000);
+        try {
+            return formatter.format(date);
+        } catch (Exception e) {
+            LOG.error("Exception when formatting.", e);
+            return null;
+        }
+    }
+
+    /**
+     * Returns the value of the argument as an unsigned integer in seconds 
since '1970-01-01
+     * 00:00:00' UTC.
+     */
+    public static long unixTimestamp(String dateStr, TimeZone timeZone) {
+        return unixTimestamp(dateStr, TIMESTAMP_FORMAT_STRING, timeZone);
+    }
+
+    /**
+     * Returns the value of the argument as an unsigned integer in seconds 
since '1970-01-01
+     * 00:00:00' UTC.
+     */
+    public static long unixTimestamp(String dateStr, String format, TimeZone 
timeZone) {
+        long ts = internalParseTimestampMillis(dateStr, format, timeZone);
+        if (ts == Long.MIN_VALUE) {
+            return Long.MIN_VALUE;
+        } else {
+            // return the seconds
+            return ts / 1000;
+        }
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Format
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index 6f6d52a3b..a7555203c 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -80,6 +80,27 @@ public class SystemFunctionUtils {
         return timestampMillisToDate(localtimestamp(epochTime, 
timezone).getMillisecond());
     }
 
+    public static String fromUnixtime(long seconds, String timezone) {
+        return DateTimeUtils.formatUnixTimestamp(seconds, 
TimeZone.getTimeZone(timezone));
+    }
+
+    public static String fromUnixtime(long seconds, String format, String 
timezone) {
+        return DateTimeUtils.formatUnixTimestamp(seconds, format, 
TimeZone.getTimeZone(timezone));
+    }
+
+    public static long unixTimestamp(long epochTime, String timezone) {
+        return epochTime / 1000;
+    }
+
+    public static long unixTimestamp(String dateTimeStr, long epochTime, 
String timezone) {
+        return DateTimeUtils.unixTimestamp(dateTimeStr, 
TimeZone.getTimeZone(timezone));
+    }
+
+    public static long unixTimestamp(
+            String dateTimeStr, String format, long epochTime, String 
timezone) {
+        return DateTimeUtils.unixTimestamp(dateTimeStr, format, 
TimeZone.getTimeZone(timezone));
+    }
+
     public static String dateFormat(TimestampData timestamp, String format) {
         return DateTimeUtils.formatTimestampMillis(
                 timestamp.getMillisecond(), format, 
TimeZone.getTimeZone("UTC"));
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index 6f5b26125..f60bf968c 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -57,13 +57,18 @@ public class JaninoCompiler {
             Arrays.asList("CURRENT_TIMESTAMP", "NOW");
 
     private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS =
-            Arrays.asList("LOCALTIME", "LOCALTIMESTAMP", "CURRENT_TIME", 
"CURRENT_DATE");
+            Arrays.asList(
+                    "LOCALTIME",
+                    "LOCALTIMESTAMP",
+                    "CURRENT_TIME",
+                    "CURRENT_DATE",
+                    "UNIX_TIMESTAMP");
 
     private static final List<String> 
TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS =
             Arrays.asList("DATE_FORMAT");
 
     private static final List<String> 
TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
-            Arrays.asList("TO_DATE", "TO_TIMESTAMP");
+            Arrays.asList("TO_DATE", "TO_TIMESTAMP", "FROM_UNIXTIME");
 
     public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
     public static final String DEFAULT_TIME_ZONE = "__time_zone__";
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index bb2c3503d..d47db49f8 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -212,6 +212,27 @@ public class TransformSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     return SqlSyntax.FUNCTION;
                 }
             };
+    public static final SqlFunction UNIX_TIMESTAMP =
+            new SqlFunction(
+                    "UNIX_TIMESTAMP",
+                    SqlKind.OTHER_FUNCTION,
+                    ReturnTypes.BIGINT_NULLABLE,
+                    null,
+                    OperandTypes.or(
+                            OperandTypes.NILADIC,
+                            OperandTypes.family(SqlTypeFamily.CHARACTER),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER)),
+                    SqlFunctionCategory.TIMEDATE);
+    public static final SqlFunction FROM_UNIXTIME =
+            new SqlFunction(
+                    "FROM_UNIXTIME",
+                    SqlKind.OTHER_FUNCTION,
+                    TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
+                    null,
+                    OperandTypes.or(
+                            OperandTypes.family(SqlTypeFamily.INTEGER),
+                            OperandTypes.family(SqlTypeFamily.INTEGER, 
SqlTypeFamily.CHARACTER)),
+                    SqlFunctionCategory.TIMEDATE);
     public static final SqlFunction DATE_FORMAT =
             new SqlFunction(
                     "DATE_FORMAT",
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 2de893a09..f30c19946 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -113,6 +113,40 @@ public class PostTransformOperatorTest {
                     .primaryKey("col1")
                     .build();
 
+    private static final TableId FROM_UNIX_TIME_TABLEID =
+            TableId.tableId("my_company", "my_branch", "from_unix_time_table");
+    private static final Schema FROM_UNIX_TIME_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
+                    .physicalColumn("seconds", DataTypes.BIGINT())
+                    .physicalColumn("format_str", DataTypes.STRING())
+                    .primaryKey("col1")
+                    .build();
+    private static final Schema EXPECTED_FROM_UNIX_TIME_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
+                    .physicalColumn("from_unix_time", DataTypes.STRING())
+                    .physicalColumn("from_unix_time_format", 
DataTypes.STRING())
+                    .primaryKey("col1")
+                    .build();
+
+    private static final TableId UNIX_TIMESTAMP_TABLEID =
+            TableId.tableId("my_company", "my_branch", "unix_timestamp_table");
+    private static final Schema UNIX_TIMESTAMP_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
+                    .physicalColumn("date_time_str", DataTypes.STRING())
+                    .physicalColumn("unix_timestamp_format", 
DataTypes.STRING())
+                    .primaryKey("col1")
+                    .build();
+    private static final Schema EXPECTED_UNIX_TIMESTAMP_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
+                    .physicalColumn("unix_timestamp", DataTypes.BIGINT())
+                    .physicalColumn("unix_timestamp_format", 
DataTypes.BIGINT())
+                    .primaryKey("col1")
+                    .build();
+
     private static final TableId TIMESTAMPDIFF_TABLEID =
             TableId.tableId("my_company", "my_branch", "timestampdiff_table");
     private static final Schema TIMESTAMPDIFF_SCHEMA =
@@ -799,6 +833,322 @@ public class PostTransformOperatorTest {
         transformFunctionEventEventOperatorTestHarness.close();
     }
 
+    @Test
+    void testFromUnixTimeTransform() throws Exception {
+        // In UTC, from_unix_time(0s) ==> 1970-01-01 00:00:00
+        testFromUnixTimeTransformWithTimeZone("UTC", 0L, "1970-01-01 
00:00:00");
+        // In UTC, from_unix_time(44s) ==> 1970-01-01 00:00:44
+        testFromUnixTimeTransformWithTimeZone("UTC", 44L, "1970-01-01 
00:00:44");
+        // In Berlin, the time zone is +1:00, from_unix_time(44s) ==> 
1970-01-01 01:00:44
+        testFromUnixTimeTransformWithTimeZone("Europe/Berlin", 44L, 
"1970-01-01 01:00:44");
+        // In Shanghai, the time zone is +8:00, from_unix_time(44s) ==> 
1970-01-01 08:00:44
+        testFromUnixTimeTransformWithTimeZone("Asia/Shanghai", 44L, 
"1970-01-01 08:00:44");
+    }
+
+    private void testFromUnixTimeTransformWithTimeZone(
+            String timeZone, Long seconds, String unixTimeStr) throws 
Exception {
+        PostTransformOperator transform =
+                PostTransformOperator.newBuilder()
+                        .addTransform(
+                                FROM_UNIX_TIME_TABLEID.identifier(),
+                                "col1, FROM_UNIXTIME(seconds) as 
from_unix_time,"
+                                        + " FROM_UNIXTIME(seconds, format_str) 
as from_unix_time_format",
+                                null)
+                        .addTimezone(timeZone)
+                        .build();
+        RegularEventOperatorTestHarness<PostTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        RegularEventOperatorTestHarness.with(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(FROM_UNIX_TIME_TABLEID, 
FROM_UNIX_TIME_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
FROM_UNIX_TIME_SCHEMA.toRowDataType()));
+        BinaryRecordDataGenerator expectedRecordDataGenerator =
+                new BinaryRecordDataGenerator(
+                        ((RowType) 
EXPECTED_FROM_UNIX_TIME_SCHEMA.toRowDataType()));
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        FROM_UNIX_TIME_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"),
+                                    seconds,
+                                    new BinaryStringData("yyyy-MM-dd HH:mm:ss")
+                                }));
+        DataChangeEvent insertEventExpect =
+                DataChangeEvent.insertEvent(
+                        FROM_UNIX_TIME_TABLEID,
+                        expectedRecordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"),
+                                    new BinaryStringData(unixTimeStr),
+                                    new BinaryStringData(unixTimeStr)
+                                }));
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                new CreateTableEvent(
+                                        FROM_UNIX_TIME_TABLEID, 
EXPECTED_FROM_UNIX_TIME_SCHEMA)));
+        transform.processElement(new StreamRecord<>(insertEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect));
+        transformFunctionEventEventOperatorTestHarness.close();
+    }
+
+    /*
+    Converts a date time string string1 with format string2 (by default: 
yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds),
+    using the specified timezone in table config.
+
+    If a time zone is specified in the date time string and parsed by UTC+X 
format such as “yyyy-MM-dd HH:mm:ss.SSS X”,
+    this function will use the specified timezone in the date time string 
instead of the timezone in table config. If the date time string can not be 
parsed,
+    the default value Long.MIN_VALUE(-9223372036854775808) will be returned.
+    */
+    @Test
+    void testUnixTimestampTransformInBerlin() throws Exception {
+        PostTransformOperator transform =
+                PostTransformOperator.newBuilder()
+                        .addTransform(
+                                UNIX_TIMESTAMP_TABLEID.identifier(),
+                                "col1,"
+                                        + " UNIX_TIMESTAMP(date_time_str) as 
unix_timestamp,"
+                                        + " UNIX_TIMESTAMP(date_time_str, 
unix_timestamp_format) as unix_timestamp_format",
+                                null)
+                        .addTimezone("Europe/Berlin")
+                        .build();
+        RegularEventOperatorTestHarness<PostTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        RegularEventOperatorTestHarness.with(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, 
UNIX_TIMESTAMP_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+        BinaryRecordDataGenerator expectedRecordDataGenerator =
+                new BinaryRecordDataGenerator(
+                        ((RowType) 
EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                new CreateTableEvent(
+                                        UNIX_TIMESTAMP_TABLEID, 
EXPECTED_UNIX_TIMESTAMP_SCHEMA)));
+
+        // In Berlin, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd 
HH:mm:ss.SSS" ==> 25201L
+        DataChangeEvent insertEvent1 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"),
+                                    new BinaryStringData("1970-01-01 
08:00:01.001"),
+                                    new BinaryStringData("yyyy-MM-dd 
HH:mm:ss.SSS")
+                                }));
+        DataChangeEvent insertEventExpect1 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        expectedRecordDataGenerator.generate(
+                                new Object[] {new BinaryStringData("1"), 
25201L, 25201L}));
+        transform.processElement(new StreamRecord<>(insertEvent1));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect1));
+
+        // In Berlin, "1970-01-01 08:00:01.001 +0800" formatted by "yyyy-MM-dd 
HH:mm:ss.SSS X" ==>
+        // 1L
+        DataChangeEvent insertEvent2 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("2"),
+                                    new BinaryStringData("1970-01-01 
08:00:01.001 +0800"),
+                                    new BinaryStringData("yyyy-MM-dd 
HH:mm:ss.SSS X")
+                                }));
+        DataChangeEvent insertEventExpect2 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        expectedRecordDataGenerator.generate(
+                                new Object[] {new BinaryStringData("2"), 
25201L, 1L}));
+        transform.processElement(new StreamRecord<>(insertEvent2));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect2));
+
+        // In Berlin, "1970-01-01 08:00:01.001 +0800" formatted by "yyyy-MM-dd 
HH:mm:ss.SSS" ==>
+        // 25201L
+        DataChangeEvent insertEvent3 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("3"),
+                                    new BinaryStringData("1970-01-01 
08:00:01.001 +0800"),
+                                    new BinaryStringData("yyyy-MM-dd 
HH:mm:ss.SSS")
+                                }));
+        DataChangeEvent insertEventExpect3 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        expectedRecordDataGenerator.generate(
+                                new Object[] {new BinaryStringData("3"), 
25201L, 25201L}));
+        transform.processElement(new StreamRecord<>(insertEvent3));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect3));
+
+        // In Berlin, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd 
HH:mm:ss.SSS X" ==>
+        // -9223372036854775808L
+        DataChangeEvent insertEvent4 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("4"),
+                                    new BinaryStringData("1970-01-01 
08:00:01.001"),
+                                    new BinaryStringData("yyyy-MM-dd 
HH:mm:ss.SSS X")
+                                }));
+        DataChangeEvent insertEventExpect4 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        expectedRecordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("4"), 25201L, 
-9223372036854775808L
+                                }));
+        transform.processElement(new StreamRecord<>(insertEvent4));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect4));
+        transformFunctionEventEventOperatorTestHarness.close();
+    }
+
+    @Test
+    void testUnixTimestampTransformInShanghai() throws Exception {
+        PostTransformOperator transform =
+                PostTransformOperator.newBuilder()
+                        .addTransform(
+                                UNIX_TIMESTAMP_TABLEID.identifier(),
+                                "col1,"
+                                        + " UNIX_TIMESTAMP(date_time_str) as 
unix_timestamp,"
+                                        + " UNIX_TIMESTAMP(date_time_str, 
unix_timestamp_format) as unix_timestamp_format",
+                                null)
+                        .addTimezone("Asia/Shanghai")
+                        .build();
+        RegularEventOperatorTestHarness<PostTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        RegularEventOperatorTestHarness.with(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, 
UNIX_TIMESTAMP_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+        BinaryRecordDataGenerator expectedRecordDataGenerator =
+                new BinaryRecordDataGenerator(
+                        ((RowType) 
EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType()));
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                new CreateTableEvent(
+                                        UNIX_TIMESTAMP_TABLEID, 
EXPECTED_UNIX_TIMESTAMP_SCHEMA)));
+
+        // In Shanghai, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd 
HH:mm:ss.SSS" ==> 1L
+        DataChangeEvent insertEvent1 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("1"),
+                                    new BinaryStringData("1970-01-01 
08:00:01.001"),
+                                    new BinaryStringData("yyyy-MM-dd 
HH:mm:ss.SSS")
+                                }));
+        DataChangeEvent insertEventExpect1 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        expectedRecordDataGenerator.generate(
+                                new Object[] {new BinaryStringData("1"), 1L, 
1L}));
+        transform.processElement(new StreamRecord<>(insertEvent1));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect1));
+
+        // In Shanghai, "1970-01-01 08:00:01.001 +0100" formatted by 
"yyyy-MM-dd HH:mm:ss.SSS X" ==>
+        // 1L
+        DataChangeEvent insertEvent2 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("2"),
+                                    new BinaryStringData("1970-01-01 
08:00:01.001 +0100"),
+                                    new BinaryStringData("yyyy-MM-dd 
HH:mm:ss.SSS X")
+                                }));
+        DataChangeEvent insertEventExpect2 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        expectedRecordDataGenerator.generate(
+                                new Object[] {new BinaryStringData("2"), 1L, 
25201L}));
+        transform.processElement(new StreamRecord<>(insertEvent2));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect2));
+
+        // In Shanghai, "1970-01-01 08:00:01.001 +0100" formatted by 
"yyyy-MM-dd HH:mm:ss.SSS" ==>
+        // 1L
+        DataChangeEvent insertEvent3 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("3"),
+                                    new BinaryStringData("1970-01-01 
08:00:01.001 +0100"),
+                                    new BinaryStringData("yyyy-MM-dd 
HH:mm:ss.SSS")
+                                }));
+        DataChangeEvent insertEventExpect3 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        expectedRecordDataGenerator.generate(
+                                new Object[] {new BinaryStringData("3"), 1L, 
1L}));
+        transform.processElement(new StreamRecord<>(insertEvent3));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect3));
+
+        // In Shanghai, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd 
HH:mm:ss.SSS X" ==>
+        // -9223372036854775808L
+        DataChangeEvent insertEvent4 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("4"),
+                                    new BinaryStringData("1970-01-01 
08:00:01.001"),
+                                    new BinaryStringData("yyyy-MM-dd 
HH:mm:ss.SSS X")
+                                }));
+        DataChangeEvent insertEventExpect4 =
+                DataChangeEvent.insertEvent(
+                        UNIX_TIMESTAMP_TABLEID,
+                        expectedRecordDataGenerator.generate(
+                                new Object[] {
+                                    new BinaryStringData("4"), 1L, 
-9223372036854775808L
+                                }));
+        transform.processElement(new StreamRecord<>(insertEvent4));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect4));
+        transformFunctionEventEventOperatorTestHarness.close();
+    }
+
     @Test
     void testTimestampDiffTransform() throws Exception {
         PostTransformOperator transform =
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index bc5d12005..478d4e92e 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -243,6 +243,17 @@ public class TransformParserTest {
         testFilterExpression(
                 "id = CURRENT_TIMESTAMP", "valueEquals(id, 
currentTimestamp(__epoch_time__))");
         testFilterExpression("NOW()", "now(__epoch_time__)");
+        testFilterExpression("FROM_UNIXTIME(44)", "fromUnixtime(44, 
__time_zone__)");
+        testFilterExpression(
+                "FROM_UNIXTIME(44, 'yyyy/MM/dd HH:mm:ss')",
+                "fromUnixtime(44, \"yyyy/MM/dd HH:mm:ss\", __time_zone__)");
+        testFilterExpression("UNIX_TIMESTAMP()", 
"unixTimestamp(__epoch_time__, __time_zone__)");
+        testFilterExpression(
+                "UNIX_TIMESTAMP('1970-01-01 08:00:01')",
+                "unixTimestamp(\"1970-01-01 08:00:01\", __epoch_time__, 
__time_zone__)");
+        testFilterExpression(
+                "UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd 
HH:mm:ss.SSS X')",
+                "unixTimestamp(\"1970-01-01 08:00:01.001 +0800\", \"yyyy-MM-dd 
HH:mm:ss.SSS X\", __epoch_time__, __time_zone__)");
         testFilterExpression("YEAR(dt)", "year(dt)");
         testFilterExpression("QUARTER(dt)", "quarter(dt)");
         testFilterExpression("MONTH(dt)", "month(dt)");


Reply via email to