This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 2fd03e683 [FLINK-36865][cdc] Provide UNIX_TIMESTAMP series functions in YAML pipeline 2fd03e683 is described below commit 2fd03e683e1e2f1a8c7790e6bb1a168bc1ab9266 Author: Wink <809097...@qq.com> AuthorDate: Mon Jan 13 16:18:15 2025 +0800 [FLINK-36865][cdc] Provide UNIX_TIMESTAMP series functions in YAML pipeline This closes #3819. --- docs/content.zh/docs/core-concept/transform.md | 3 + docs/content/docs/core-concept/transform.md | 3 + .../flink/cdc/common/utils/DateTimeUtils.java | 62 +++- .../cdc/runtime/functions/SystemFunctionUtils.java | 21 ++ .../flink/cdc/runtime/parser/JaninoCompiler.java | 9 +- .../parser/metadata/TransformSqlOperatorTable.java | 21 ++ .../transform/PostTransformOperatorTest.java | 350 +++++++++++++++++++++ .../cdc/runtime/parser/TransformParserTest.java | 11 + 8 files changed, 477 insertions(+), 3 deletions(-) diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md index 892243628..dfa90a728 100644 --- a/docs/content.zh/docs/core-concept/transform.md +++ b/docs/content.zh/docs/core-concept/transform.md @@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [ | TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | | TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. | | TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. | +| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but re [...] +| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. | +| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.<br/>If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. I [...] ## Conditional Functions diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md index b4979beca..67eeaf2d8 100644 --- a/docs/content/docs/core-concept/transform.md +++ b/docs/content/docs/core-concept/transform.md @@ -160,6 +160,9 @@ Flink CDC uses [Calcite](https://calcite.apache.org/) to parse expressions and [ | TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2) | timestampDiff(timepointunit, timepoint1, timepoint2) | Returns the (signed) number of timepointunit between timepoint1 and timepoint2. The unit for the interval is given by the first argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR. | | TO_DATE(string1[, string2]) | toDate(string1[, string2]) | Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. | | TO_TIMESTAMP(string1[, string2]) | toTimestamp(string1[, string2]) | Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone. | +| FROM_UNIXTIME(numeric[, string]) | fromUnixtime(NUMERIC[, STRING]) | Returns a representation of the numeric argument as a value in string format (default is ‘yyyy-MM-dd HH:mm:ss’). numeric is an internal timestamp value representing seconds since ‘1970-01-01 00:00:00’ UTC, such as produced by the UNIX_TIMESTAMP() function. The return value is expressed in the session time zone (specified in TableConfig). E.g., FROM_UNIXTIME(44) returns ‘1970-01-01 00:00:44’ if in UTC time zone, but re [...] +| UNIX_TIMESTAMP() | unixTimestamp() | Gets current Unix timestamp in seconds. This function is not deterministic which means the value would be recalculated for each record. | +| UNIX_TIMESTAMP(string1[, string2]) | unixTimestamp(STRING1[, STRING2]) | Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), using the specified timezone in table config.<br/>If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, this function will use the specified timezone in the date time string instead of the timezone in table config. I [...] ## Conditional Functions diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java index b923107e2..1fb080de9 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java @@ -43,6 +43,16 @@ public class DateTimeUtils { */ public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000 + /** The SimpleDateFormat string for ISO dates, "yyyy-MM-dd". */ + private static final String DATE_FORMAT_STRING = "yyyy-MM-dd"; + + /** The SimpleDateFormat string for ISO times, "HH:mm:ss". */ + private static final String TIME_FORMAT_STRING = "HH:mm:ss"; + + /** The SimpleDateFormat string for ISO timestamps, "yyyy-MM-dd HH:mm:ss". */ + private static final String TIMESTAMP_FORMAT_STRING = + DATE_FORMAT_STRING + " " + TIME_FORMAT_STRING; + /** * A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat is not thread-safe. * (string_format) => formatter @@ -109,7 +119,7 @@ public class DateTimeUtils { } catch (ParseException e) { LOG.error( String.format( - "Exception when parsing datetime string '%s' in format '%s'", + "Exception when parsing datetime string '%s' in format '%s', the default value Long.MIN_VALUE(-9223372036854775808) will be returned.", dateStr, format), e); return Long.MIN_VALUE; @@ -128,6 +138,56 @@ public class DateTimeUtils { return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 32045; } + // -------------------------------------------------------------------------------------------- + // UNIX TIME + // -------------------------------------------------------------------------------------------- + + /** + * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the + * "yyyy-MM-dd HH:mm:ss" format. + */ + public static String formatUnixTimestamp(long unixTime, TimeZone timeZone) { + return formatUnixTimestamp(unixTime, TIMESTAMP_FORMAT_STRING, timeZone); + } + + /** + * Convert unix timestamp (seconds since '1970-01-01 00:00:00' UTC) to datetime string in the + * given format. + */ + public static String formatUnixTimestamp(long unixTime, String format, TimeZone timeZone) { + SimpleDateFormat formatter = FORMATTER_CACHE.get(format); + formatter.setTimeZone(timeZone); + Date date = new Date(unixTime * 1000); + try { + return formatter.format(date); + } catch (Exception e) { + LOG.error("Exception when formatting.", e); + return null; + } + } + + /** + * Returns the value of the argument as an unsigned integer in seconds since '1970-01-01 + * 00:00:00' UTC. + */ + public static long unixTimestamp(String dateStr, TimeZone timeZone) { + return unixTimestamp(dateStr, TIMESTAMP_FORMAT_STRING, timeZone); + } + + /** + * Returns the value of the argument as an unsigned integer in seconds since '1970-01-01 + * 00:00:00' UTC. + */ + public static long unixTimestamp(String dateStr, String format, TimeZone timeZone) { + long ts = internalParseTimestampMillis(dateStr, format, timeZone); + if (ts == Long.MIN_VALUE) { + return Long.MIN_VALUE; + } else { + // return the seconds + return ts / 1000; + } + } + // -------------------------------------------------------------------------------------------- // Format // -------------------------------------------------------------------------------------------- diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java index 6f6d52a3b..a7555203c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java @@ -80,6 +80,27 @@ public class SystemFunctionUtils { return timestampMillisToDate(localtimestamp(epochTime, timezone).getMillisecond()); } + public static String fromUnixtime(long seconds, String timezone) { + return DateTimeUtils.formatUnixTimestamp(seconds, TimeZone.getTimeZone(timezone)); + } + + public static String fromUnixtime(long seconds, String format, String timezone) { + return DateTimeUtils.formatUnixTimestamp(seconds, format, TimeZone.getTimeZone(timezone)); + } + + public static long unixTimestamp(long epochTime, String timezone) { + return epochTime / 1000; + } + + public static long unixTimestamp(String dateTimeStr, long epochTime, String timezone) { + return DateTimeUtils.unixTimestamp(dateTimeStr, TimeZone.getTimeZone(timezone)); + } + + public static long unixTimestamp( + String dateTimeStr, String format, long epochTime, String timezone) { + return DateTimeUtils.unixTimestamp(dateTimeStr, format, TimeZone.getTimeZone(timezone)); + } + public static String dateFormat(TimestampData timestamp, String format) { return DateTimeUtils.formatTimestampMillis( timestamp.getMillisecond(), format, TimeZone.getTimeZone("UTC")); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index 6f5b26125..f60bf968c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -57,13 +57,18 @@ public class JaninoCompiler { Arrays.asList("CURRENT_TIMESTAMP", "NOW"); private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_FUNCTIONS = - Arrays.asList("LOCALTIME", "LOCALTIMESTAMP", "CURRENT_TIME", "CURRENT_DATE"); + Arrays.asList( + "LOCALTIME", + "LOCALTIMESTAMP", + "CURRENT_TIME", + "CURRENT_DATE", + "UNIX_TIMESTAMP"); private static final List<String> TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS = Arrays.asList("DATE_FORMAT"); private static final List<String> TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS = - Arrays.asList("TO_DATE", "TO_TIMESTAMP"); + Arrays.asList("TO_DATE", "TO_TIMESTAMP", "FROM_UNIXTIME"); public static final String DEFAULT_EPOCH_TIME = "__epoch_time__"; public static final String DEFAULT_TIME_ZONE = "__time_zone__"; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java index bb2c3503d..d47db49f8 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java @@ -212,6 +212,27 @@ public class TransformSqlOperatorTable extends ReflectiveSqlOperatorTable { return SqlSyntax.FUNCTION; } }; + public static final SqlFunction UNIX_TIMESTAMP = + new SqlFunction( + "UNIX_TIMESTAMP", + SqlKind.OTHER_FUNCTION, + ReturnTypes.BIGINT_NULLABLE, + null, + OperandTypes.or( + OperandTypes.NILADIC, + OperandTypes.family(SqlTypeFamily.CHARACTER), + OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)), + SqlFunctionCategory.TIMEDATE); + public static final SqlFunction FROM_UNIXTIME = + new SqlFunction( + "FROM_UNIXTIME", + SqlKind.OTHER_FUNCTION, + TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE, + null, + OperandTypes.or( + OperandTypes.family(SqlTypeFamily.INTEGER), + OperandTypes.family(SqlTypeFamily.INTEGER, SqlTypeFamily.CHARACTER)), + SqlFunctionCategory.TIMEDATE); public static final SqlFunction DATE_FORMAT = new SqlFunction( "DATE_FORMAT", diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index 2de893a09..f30c19946 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -113,6 +113,40 @@ public class PostTransformOperatorTest { .primaryKey("col1") .build(); + private static final TableId FROM_UNIX_TIME_TABLEID = + TableId.tableId("my_company", "my_branch", "from_unix_time_table"); + private static final Schema FROM_UNIX_TIME_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING().notNull()) + .physicalColumn("seconds", DataTypes.BIGINT()) + .physicalColumn("format_str", DataTypes.STRING()) + .primaryKey("col1") + .build(); + private static final Schema EXPECTED_FROM_UNIX_TIME_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING().notNull()) + .physicalColumn("from_unix_time", DataTypes.STRING()) + .physicalColumn("from_unix_time_format", DataTypes.STRING()) + .primaryKey("col1") + .build(); + + private static final TableId UNIX_TIMESTAMP_TABLEID = + TableId.tableId("my_company", "my_branch", "unix_timestamp_table"); + private static final Schema UNIX_TIMESTAMP_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING().notNull()) + .physicalColumn("date_time_str", DataTypes.STRING()) + .physicalColumn("unix_timestamp_format", DataTypes.STRING()) + .primaryKey("col1") + .build(); + private static final Schema EXPECTED_UNIX_TIMESTAMP_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING().notNull()) + .physicalColumn("unix_timestamp", DataTypes.BIGINT()) + .physicalColumn("unix_timestamp_format", DataTypes.BIGINT()) + .primaryKey("col1") + .build(); + private static final TableId TIMESTAMPDIFF_TABLEID = TableId.tableId("my_company", "my_branch", "timestampdiff_table"); private static final Schema TIMESTAMPDIFF_SCHEMA = @@ -799,6 +833,322 @@ public class PostTransformOperatorTest { transformFunctionEventEventOperatorTestHarness.close(); } + @Test + void testFromUnixTimeTransform() throws Exception { + // In UTC, from_unix_time(0s) ==> 1970-01-01 00:00:00 + testFromUnixTimeTransformWithTimeZone("UTC", 0L, "1970-01-01 00:00:00"); + // In UTC, from_unix_time(44s) ==> 1970-01-01 00:00:44 + testFromUnixTimeTransformWithTimeZone("UTC", 44L, "1970-01-01 00:00:44"); + // In Berlin, the time zone is +1:00, from_unix_time(44s) ==> 1970-01-01 01:00:44 + testFromUnixTimeTransformWithTimeZone("Europe/Berlin", 44L, "1970-01-01 01:00:44"); + // In Shanghai, the time zone is +8:00, from_unix_time(44s) ==> 1970-01-01 08:00:44 + testFromUnixTimeTransformWithTimeZone("Asia/Shanghai", 44L, "1970-01-01 08:00:44"); + } + + private void testFromUnixTimeTransformWithTimeZone( + String timeZone, Long seconds, String unixTimeStr) throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + FROM_UNIX_TIME_TABLEID.identifier(), + "col1, FROM_UNIXTIME(seconds) as from_unix_time," + + " FROM_UNIXTIME(seconds, format_str) as from_unix_time_format", + null) + .addTimezone(timeZone) + .build(); + RegularEventOperatorTestHarness<PostTransformOperator, Event> + transformFunctionEventEventOperatorTestHarness = + RegularEventOperatorTestHarness.with(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(FROM_UNIX_TIME_TABLEID, FROM_UNIX_TIME_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) FROM_UNIX_TIME_SCHEMA.toRowDataType())); + BinaryRecordDataGenerator expectedRecordDataGenerator = + new BinaryRecordDataGenerator( + ((RowType) EXPECTED_FROM_UNIX_TIME_SCHEMA.toRowDataType())); + DataChangeEvent insertEvent = + DataChangeEvent.insertEvent( + FROM_UNIX_TIME_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + seconds, + new BinaryStringData("yyyy-MM-dd HH:mm:ss") + })); + DataChangeEvent insertEventExpect = + DataChangeEvent.insertEvent( + FROM_UNIX_TIME_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData(unixTimeStr), + new BinaryStringData(unixTimeStr) + })); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent( + FROM_UNIX_TIME_TABLEID, EXPECTED_FROM_UNIX_TIME_SCHEMA))); + transform.processElement(new StreamRecord<>(insertEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect)); + transformFunctionEventEventOperatorTestHarness.close(); + } + + /* + Converts a date time string string1 with format string2 (by default: yyyy-MM-dd HH:mm:ss if not specified) to Unix timestamp (in seconds), + using the specified timezone in table config. + + If a time zone is specified in the date time string and parsed by UTC+X format such as “yyyy-MM-dd HH:mm:ss.SSS X”, + this function will use the specified timezone in the date time string instead of the timezone in table config. If the date time string can not be parsed, + the default value Long.MIN_VALUE(-9223372036854775808) will be returned. + */ + @Test + void testUnixTimestampTransformInBerlin() throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + UNIX_TIMESTAMP_TABLEID.identifier(), + "col1," + + " UNIX_TIMESTAMP(date_time_str) as unix_timestamp," + + " UNIX_TIMESTAMP(date_time_str, unix_timestamp_format) as unix_timestamp_format", + null) + .addTimezone("Europe/Berlin") + .build(); + RegularEventOperatorTestHarness<PostTransformOperator, Event> + transformFunctionEventEventOperatorTestHarness = + RegularEventOperatorTestHarness.with(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType())); + BinaryRecordDataGenerator expectedRecordDataGenerator = + new BinaryRecordDataGenerator( + ((RowType) EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType())); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent( + UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA))); + + // In Berlin, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==> 25201L + DataChangeEvent insertEvent1 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData("1970-01-01 08:00:01.001"), + new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS") + })); + DataChangeEvent insertEventExpect1 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] {new BinaryStringData("1"), 25201L, 25201L})); + transform.processElement(new StreamRecord<>(insertEvent1)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect1)); + + // In Berlin, "1970-01-01 08:00:01.001 +0800" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==> + // 1L + DataChangeEvent insertEvent2 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("2"), + new BinaryStringData("1970-01-01 08:00:01.001 +0800"), + new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X") + })); + DataChangeEvent insertEventExpect2 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] {new BinaryStringData("2"), 25201L, 1L})); + transform.processElement(new StreamRecord<>(insertEvent2)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect2)); + + // In Berlin, "1970-01-01 08:00:01.001 +0800" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==> + // 25201L + DataChangeEvent insertEvent3 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("3"), + new BinaryStringData("1970-01-01 08:00:01.001 +0800"), + new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS") + })); + DataChangeEvent insertEventExpect3 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] {new BinaryStringData("3"), 25201L, 25201L})); + transform.processElement(new StreamRecord<>(insertEvent3)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect3)); + + // In Berlin, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==> + // -9223372036854775808L + DataChangeEvent insertEvent4 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("4"), + new BinaryStringData("1970-01-01 08:00:01.001"), + new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X") + })); + DataChangeEvent insertEventExpect4 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] { + new BinaryStringData("4"), 25201L, -9223372036854775808L + })); + transform.processElement(new StreamRecord<>(insertEvent4)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect4)); + transformFunctionEventEventOperatorTestHarness.close(); + } + + @Test + void testUnixTimestampTransformInShanghai() throws Exception { + PostTransformOperator transform = + PostTransformOperator.newBuilder() + .addTransform( + UNIX_TIMESTAMP_TABLEID.identifier(), + "col1," + + " UNIX_TIMESTAMP(date_time_str) as unix_timestamp," + + " UNIX_TIMESTAMP(date_time_str, unix_timestamp_format) as unix_timestamp_format", + null) + .addTimezone("Asia/Shanghai") + .build(); + RegularEventOperatorTestHarness<PostTransformOperator, Event> + transformFunctionEventEventOperatorTestHarness = + RegularEventOperatorTestHarness.with(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(UNIX_TIMESTAMP_TABLEID, UNIX_TIMESTAMP_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) UNIX_TIMESTAMP_SCHEMA.toRowDataType())); + BinaryRecordDataGenerator expectedRecordDataGenerator = + new BinaryRecordDataGenerator( + ((RowType) EXPECTED_UNIX_TIMESTAMP_SCHEMA.toRowDataType())); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent( + UNIX_TIMESTAMP_TABLEID, EXPECTED_UNIX_TIMESTAMP_SCHEMA))); + + // In Shanghai, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==> 1L + DataChangeEvent insertEvent1 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new BinaryStringData("1970-01-01 08:00:01.001"), + new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS") + })); + DataChangeEvent insertEventExpect1 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] {new BinaryStringData("1"), 1L, 1L})); + transform.processElement(new StreamRecord<>(insertEvent1)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect1)); + + // In Shanghai, "1970-01-01 08:00:01.001 +0100" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==> + // 1L + DataChangeEvent insertEvent2 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("2"), + new BinaryStringData("1970-01-01 08:00:01.001 +0100"), + new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X") + })); + DataChangeEvent insertEventExpect2 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] {new BinaryStringData("2"), 1L, 25201L})); + transform.processElement(new StreamRecord<>(insertEvent2)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect2)); + + // In Shanghai, "1970-01-01 08:00:01.001 +0100" formatted by "yyyy-MM-dd HH:mm:ss.SSS" ==> + // 1L + DataChangeEvent insertEvent3 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("3"), + new BinaryStringData("1970-01-01 08:00:01.001 +0100"), + new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS") + })); + DataChangeEvent insertEventExpect3 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] {new BinaryStringData("3"), 1L, 1L})); + transform.processElement(new StreamRecord<>(insertEvent3)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect3)); + + // In Shanghai, "1970-01-01 08:00:01.001" formatted by "yyyy-MM-dd HH:mm:ss.SSS X" ==> + // -9223372036854775808L + DataChangeEvent insertEvent4 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("4"), + new BinaryStringData("1970-01-01 08:00:01.001"), + new BinaryStringData("yyyy-MM-dd HH:mm:ss.SSS X") + })); + DataChangeEvent insertEventExpect4 = + DataChangeEvent.insertEvent( + UNIX_TIMESTAMP_TABLEID, + expectedRecordDataGenerator.generate( + new Object[] { + new BinaryStringData("4"), 1L, -9223372036854775808L + })); + transform.processElement(new StreamRecord<>(insertEvent4)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect4)); + transformFunctionEventEventOperatorTestHarness.close(); + } + @Test void testTimestampDiffTransform() throws Exception { PostTransformOperator transform = diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index bc5d12005..478d4e92e 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -243,6 +243,17 @@ public class TransformParserTest { testFilterExpression( "id = CURRENT_TIMESTAMP", "valueEquals(id, currentTimestamp(__epoch_time__))"); testFilterExpression("NOW()", "now(__epoch_time__)"); + testFilterExpression("FROM_UNIXTIME(44)", "fromUnixtime(44, __time_zone__)"); + testFilterExpression( + "FROM_UNIXTIME(44, 'yyyy/MM/dd HH:mm:ss')", + "fromUnixtime(44, \"yyyy/MM/dd HH:mm:ss\", __time_zone__)"); + testFilterExpression("UNIX_TIMESTAMP()", "unixTimestamp(__epoch_time__, __time_zone__)"); + testFilterExpression( + "UNIX_TIMESTAMP('1970-01-01 08:00:01')", + "unixTimestamp(\"1970-01-01 08:00:01\", __epoch_time__, __time_zone__)"); + testFilterExpression( + "UNIX_TIMESTAMP('1970-01-01 08:00:01.001 +0800', 'yyyy-MM-dd HH:mm:ss.SSS X')", + "unixTimestamp(\"1970-01-01 08:00:01.001 +0800\", \"yyyy-MM-dd HH:mm:ss.SSS X\", __epoch_time__, __time_zone__)"); testFilterExpression("YEAR(dt)", "year(dt)"); testFilterExpression("QUARTER(dt)", "quarter(dt)"); testFilterExpression("MONTH(dt)", "month(dt)");