This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c730292d645 [FLINK-36862][table] Implement additional `TO_TIMESTAMP_LTZ` functions c730292d645 is described below commit c730292d645fc7ea24659f791eb8599b2c849408 Author: Yiyu Tian <120529776+yiyuti...@users.noreply.github.com> AuthorDate: Sun Jan 12 12:30:43 2025 -0800 [FLINK-36862][table] Implement additional `TO_TIMESTAMP_LTZ` functions --- docs/data/sql_functions.yml | 7 +- docs/data/sql_functions_zh.yml | 11 +- flink-python/pyflink/table/expressions.py | 48 ++++- .../pyflink/table/tests/test_expression.py | 9 +- .../org/apache/flink/table/api/Expressions.java | 55 ++++++ .../table/api/ImplicitExpressionConversions.scala | 11 -- .../functions/BuiltInFunctionDefinitions.java | 21 ++- .../strategies/ToTimestampLtzTypeStrategy.java | 65 ++++++- .../apache/flink/table/utils/DateTimeUtils.java | 10 +- .../strategies/ToTimestampLtzTypeStrategyTest.java | 98 ++++++++++ .../expressions/converter/DirectConvertRule.java | 3 - .../functions/sql/FlinkSqlOperatorTable.java | 11 -- .../planner/codegen/calls/BuiltInMethods.scala | 13 -- .../planner/codegen/calls/FunctionGenerator.scala | 21 --- .../planner/codegen/calls/StringCallGen.scala | 2 +- .../planner/functions/TimeFunctionsITCase.java | 205 ++++++++++++++++++++- .../planner/plan/batch/sql/TimeTravelTest.java | 2 +- .../planner/expressions/TemporalTypesTest.scala | 25 +-- .../functions/scalar/ToTimestampLtzFunction.java | 139 ++++++++++++++ 19 files changed, 650 insertions(+), 106 deletions(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index eda6d3de753..2ff7353ebd2 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -679,9 +679,12 @@ temporal: - sql: TO_DATE(string1[, string2]) table: toDate(STRING1[, STRING2]) description: Converts a date string string1 with format string2 (by default 'yyyy-MM-dd') to a date. - - sql: TO_TIMESTAMP_LTZ(numeric, precision) + - sql: TO_TIMESTAMP_LTZ(numeric[, precision]) table: toTimestampLtz(NUMERIC, PRECISION) - description: "Converts a epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3)." + description: Converts an epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3). If no precision is provided, the default precision is 3. If any input is null, the function will return null. + - sql: TO_TIMESTAMP_LTZ(string1[, string2[, string3]]) + table: toTimestampLtz(STRING1[, STRING2[, STRING3]]) + description: Converts a timestamp string string1 with format string2 (by default 'yyyy-MM-dd HH:mm:ss.SSS') in time zone string3 (by default 'UTC') to a TIMESTAMP_LTZ. If any input is null, the function will return null. - sql: TO_TIMESTAMP(string1[, string2]) table: toTimestamp(STRING1[, STRING2]) description: "Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone." diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index d0250acc6f6..06f5ce34ef9 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -805,11 +805,12 @@ temporal: - sql: TO_DATE(string1[, string2]) table: toDate(STRING1[, STRING2]) description: 将格式为 string2(默认为 'yyyy-MM-dd')的字符串 string1 转换为日期。 - - sql: TO_TIMESTAMP_LTZ(numeric, precision) - table: toTimestampLtz(numeric, PRECISION) - description: | - 将纪元秒或纪元毫秒转换为 TIMESTAMP_LTZ,有效精度为 0 或 3,0 代表 `TO_TIMESTAMP_LTZ(epochSeconds, 0)`, - 3 代表` TO_TIMESTAMP_LTZ(epochMilliseconds, 3)`。 + - sql: TO_TIMESTAMP_LTZ(numeric[, precision]) + table: toTimestampLtz(NUMERIC, PRECISION) + description: Converts an epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents TO_TIMESTAMP_LTZ(epochMilliseconds, 3). If no precision is provided, the default precision is 3. If any input is null, the function will return null. + - sql: TO_TIMESTAMP_LTZ(string1[, string2[, string3]]) + table: toTimestampLtz(STRING1[, STRING2[, STRING3]]) + description: Converts a timestamp string string1 with format string2 (by default 'yyyy-MM-dd HH:mm:ss.SSS') in time zone string3 (by default 'UTC') to a TIMESTAMP_LTZ. If any input is null, the function will return null. - sql: TO_TIMESTAMP(string1[, string2]) table: toTimestamp(STRING1[, STRING2]) description: 将格式为 string2(默认为:'yyyy-MM-dd HH:mm:ss')的字符串 string1 转换为 timestamp,不带时区。 diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py index 3b326cb4641..0e73fc78369 100644 --- a/flink-python/pyflink/table/expressions.py +++ b/flink-python/pyflink/table/expressions.py @@ -306,19 +306,47 @@ def to_timestamp(timestamp_str: Union[str, Expression[str]], return _binary_op("toTimestamp", timestamp_str, format) -def to_timestamp_ltz(numeric_epoch_time, precision) -> Expression: - """ - Converts a numeric type epoch time to TIMESTAMP_LTZ. +def to_timestamp_ltz(*args) -> Expression: + """ + Converts a value to a timestamp with local time zone. + + Supported functions: + 1. to_timestamp_ltz(Numeric) -> DataTypes.TIMESTAMP_LTZ + Converts a numeric value of epoch milliseconds to a TIMESTAMP_LTZ. The default precision is 3. + 2. to_timestamp_ltz(Numeric, Integer) -> DataTypes.TIMESTAMP_LTZ + Converts a numeric value of epoch seconds or epoch milliseconds to a TIMESTAMP_LTZ. + Valid precisions are 0 or 3. + 3. to_timestamp_ltz(String) -> DataTypes.TIMESTAMP_LTZ + Converts a timestamp string using default format 'yyyy-MM-dd HH:mm:ss.SSS' to a TIMESTAMP_LTZ. + 4. to_timestamp_ltz(String, String) -> DataTypes.TIMESTAMP_LTZ + Converts a timestamp string using format (default 'yyyy-MM-dd HH:mm:ss.SSS') to a TIMESTAMP_LTZ. + 5. to_timestamp_ltz(String, String, String) -> DataTypes.TIMESTAMP_LTZ + Converts a timestamp string string1 using format string2 (default 'yyyy-MM-dd HH:mm:ss.SSS') + in time zone string3 (default 'UTC') to a TIMESTAMP_LTZ. + Supports any timezone that is available in Java's TimeZone database. - The supported precision is 0 or 3: - 0 means the numericEpochTime is in second. - 3 means the numericEpochTime is in millisecond. + Example: + :: - :param numeric_epoch_time: The epoch time with numeric type - :param precision: The precision to indicate the epoch time is in second or millisecond - :return: The timestamp value with TIMESTAMP_LTZ type. + >>> table.select(to_timestamp_ltz(100)) # numeric with default precision + >>> table.select(to_timestamp_ltz(100, 0)) # numeric with second precision + >>> table.select(to_timestamp_ltz(100, 3)) # numeric with millisecond precision + >>> table.select(to_timestamp_ltz("2023-01-01 00:00:00")) # string with default format + >>> table.select(to_timestamp_ltz("01/01/2023", "MM/dd/yyyy")) # string with format + >>> table.select(to_timestamp_ltz("2023-01-01 00:00:00", + "yyyy-MM-dd HH:mm:ss", + "UTC")) # string with format and timezone """ - return _binary_op("toTimestampLtz", numeric_epoch_time, precision) + if len(args) == 1: + return _unary_op("toTimestampLtz", lit(args[0])) + + # For two arguments case (numeric + precision or string + format) + elif len(args) == 2: + return _binary_op("toTimestampLtz", lit(args[0]), lit(args[1])) + + # For three arguments case (string + format + timezone) + else: + return _ternary_op("toTimestampLtz", lit(args[0]), lit(args[1]), lit(args[2])) def temporal_overlaps(left_time_point, diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index 2ea4bbdffbd..ef5a763f31e 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -285,7 +285,14 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): self.assertEqual("toDate('2018-03-18')", str(to_date('2018-03-18'))) self.assertEqual("toDate('2018-03-18', 'yyyy-MM-dd')", str(to_date('2018-03-18', 'yyyy-MM-dd'))) - self.assertEqual('toTimestampLtz(123, 0)', str(to_timestamp_ltz(123, 0))) + self.assertEqual('TO_TIMESTAMP_LTZ(100)', str(to_timestamp_ltz(100))) + self.assertEqual("TO_TIMESTAMP_LTZ('2023-01-01 00:00:00')", + str(to_timestamp_ltz('2023-01-01 00:00:00'))) + self.assertEqual("TO_TIMESTAMP_LTZ('01/01/2023 00:00:00', 'MM/dd/yyyy HH:mm:ss')", + str(to_timestamp_ltz("01/01/2023 00:00:00", "MM/dd/yyyy HH:mm:ss"))) + self.assertEqual("TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 'UTC')", + str(to_timestamp_ltz("2023-01-01 00:00:00", "yyyy-MM-dd HH:mm:ss", "UTC"))) + self.assertEqual("TO_TIMESTAMP_LTZ(123, 0)", str(to_timestamp_ltz(123, 0))) self.assertEqual("toTimestamp('1970-01-01 08:01:40')", str(to_timestamp('1970-01-01 08:01:40'))) self.assertEqual("toTimestamp('1970-01-01 08:01:40', 'yyyy-MM-dd HH:mm:ss')", diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java index 9b687ae65a0..84365f5f5ed 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java @@ -366,6 +366,61 @@ public final class Expressions { return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, numericEpochTime, precision); } + /** + * Converts the given time string with the specified format to {@link + * DataTypes#TIMESTAMP_LTZ(int)}. + * + * @param timestampStr The timestamp string to convert. + * @param format The format of the string. + * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} type. + */ + public static ApiExpression toTimestampLtz(String timestampStr, String format) { + return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, timestampStr, format); + } + + /** + * Converts a timestamp to {@link DataTypes#TIMESTAMP_LTZ(int)}. + * + * <p>This method takes a string representing a timestamp and converts it to a TIMESTAMP_LTZ + * using the built-in TO_TIMESTAMP_LTZ function definition. + * + * @param timeStamp The timestamp string to be converted. + * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} type. + */ + public static ApiExpression toTimestampLtz(String timeStamp) { + return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, timeStamp); + } + + /** + * Converts a numeric type epoch time to {@link DataTypes#TIMESTAMP_LTZ(int)}. + * + * <p>This method takes an object representing an epoch time and converts it to a TIMESTAMP_LTZ + * using the built-in TO_TIMESTAMP_LTZ function definition. + * + * @param numericEpochTime The epoch time with numeric type. + * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} type. + */ + public static ApiExpression toTimestampLtz(Object numericEpochTime) { + return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, numericEpochTime); + } + + /** + * Converts a string timestamp with the custom format and timezone to {@link + * DataTypes#TIMESTAMP_LTZ(int)}. + * + * <p>The timestamp string will be parsed using the custom format and timezone, and converted to + * a TIMESTAMP_LTZ value. + * + * @param timestampStr The timestamp string to convert. + * @param format The format pattern to parse the timestamp string. + * @param timezone The timezone to use for the conversion. + * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} type. + */ + public static ApiExpression toTimestampLtz( + Object timestampStr, Object format, Object timezone) { + return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, timestampStr, format, timezone); + } + /** * Determines whether two anchored time intervals overlap. Time point and temporal are * transformed into a range defined by two time points (start, end). The function evaluates diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala index d889851064a..b1b59ed663a 100644 --- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala +++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala @@ -519,17 +519,6 @@ trait ImplicitExpressionConversions { Expressions.toTimestamp(timestampStr, format) } - /** - * Converts a numeric type epoch time to [[DataTypes#TIMESTAMP_LTZ]]. - * - * The supported precision is 0 or 3: - * - 0 means the numericEpochTime is in second. - * - 3 means the numericEpochTime is in millisecond. - */ - def toTimestampLtz(numericEpochTime: Expression, precision: Expression): Expression = { - Expressions.toTimestampLtz(numericEpochTime, precision) - } - /** * Determines whether two anchored time intervals overlap. Time point and temporal are transformed * into a range defined by two time points (start, end). The function evaluates <code>leftEnd >= diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index ec4e4291089..2bd61bd04cb 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -2332,14 +2332,25 @@ public final class BuiltInFunctionDefinitions { public static final BuiltInFunctionDefinition TO_TIMESTAMP_LTZ = BuiltInFunctionDefinition.newBuilder() - .name("toTimestampLtz") - .sqlName("TO_TIMESTAMP_LTZ") + .name("TO_TIMESTAMP_LTZ") .kind(SCALAR) .inputTypeStrategy( - sequence( - logical(LogicalTypeFamily.NUMERIC), - logical(LogicalTypeFamily.INTEGER_NUMERIC, false))) + or( + sequence(logical(LogicalTypeFamily.CHARACTER_STRING)), + sequence( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING)), + sequence( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING)), + sequence(logical(LogicalTypeFamily.NUMERIC)), + sequence( + logical(LogicalTypeFamily.NUMERIC), + logical(LogicalTypeFamily.INTEGER_NUMERIC)))) .outputTypeStrategy(SpecificTypeStrategies.TO_TIMESTAMP_LTZ) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.ToTimestampLtzFunction") .build(); public static final BuiltInFunctionDefinition TO_TIMESTAMP = diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java index 722dd63e51d..fe11e91e432 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java @@ -20,22 +20,79 @@ package org.apache.flink.table.types.inference.strategies; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import java.util.List; import java.util.Optional; /** Type strategy of {@code TO_TIMESTAMP_LTZ}. */ @Internal public class ToTimestampLtzTypeStrategy implements TypeStrategy { + private static final int DEFAULT_PRECISION = 3; + @Override public Optional<DataType> inferType(CallContext callContext) { - if (callContext.isArgumentLiteral(1)) { - final int precision = callContext.getArgumentValue(1, Integer.class).get(); - return Optional.of(DataTypes.TIMESTAMP_LTZ(precision)); + List<DataType> argumentTypes = callContext.getArgumentDataTypes(); + int argCount = argumentTypes.size(); + + if (argCount < 1 || argCount > 3) { + throw new ValidationException( + "Unsupported argument type. " + + "TO_TIMESTAMP_LTZ requires 1 to 3 arguments, but " + + argCount + + " were provided."); + } + + LogicalType firstType = argumentTypes.get(0).getLogicalType(); + LogicalTypeRoot firstTypeRoot = firstType.getTypeRoot(); + + if (argCount == 1) { + if (!isCharacterType(firstTypeRoot) && !firstType.is(LogicalTypeFamily.NUMERIC)) { + throw new ValidationException( + "Unsupported argument type. " + + "When taking 1 argument, TO_TIMESTAMP_LTZ accepts an argument of type <VARCHAR>, <CHAR>, or <NUMERIC>."); + } + } else if (argCount == 2) { + LogicalType secondType = argumentTypes.get(1).getLogicalType(); + LogicalTypeRoot secondTypeRoot = secondType.getTypeRoot(); + if (firstType.is(LogicalTypeFamily.NUMERIC)) { + if (secondTypeRoot != LogicalTypeRoot.INTEGER) { + throw new ValidationException( + "Unsupported argument type. " + + "TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>) requires the second argument to be <INTEGER>."); + } + } else if (isCharacterType(firstTypeRoot)) { + if (!isCharacterType(secondTypeRoot)) { + throw new ValidationException( + "Unsupported argument type. " + + "If the first argument is of type <VARCHAR> or <CHAR>, TO_TIMESTAMP_LTZ requires the second argument to be of type <VARCHAR> or <CHAR>."); + } + } else { + throw new ValidationException( + "Unsupported argument type. " + + "When taking 2 arguments, TO_TIMESTAMP_LTZ requires the first argument to be of type <VARCHAR>, <CHAR>, or <NUMERIC>."); + } + } else if (argCount == 3) { + if (!isCharacterType(firstTypeRoot) + || !isCharacterType(argumentTypes.get(1).getLogicalType().getTypeRoot()) + || !isCharacterType(argumentTypes.get(2).getLogicalType().getTypeRoot())) { + throw new ValidationException( + "Unsupported argument type. " + + "When taking 3 arguments, TO_TIMESTAMP_LTZ requires all three arguments to be of type <VARCHAR> or <CHAR>."); + } } - return Optional.of(DataTypes.TIMESTAMP_LTZ(3)); + + return Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION).nullable()); + } + + private boolean isCharacterType(LogicalTypeRoot typeRoot) { + return typeRoot == LogicalTypeRoot.CHAR || typeRoot == LogicalTypeRoot.VARCHAR; } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java index c8d0f2e7311..4346c437bb8 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java @@ -142,6 +142,8 @@ public class DateTimeUtils { .optionalEnd() .toFormatter(); + private static final Integer DEFAULT_PRECISION = 3; + /** * A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat is not thread-safe. * (string_format) => formatter @@ -422,8 +424,12 @@ public class DateTimeUtils { } public static TimestampData parseTimestampData(String dateStr, String format) { - DateTimeFormatter formatter = DATETIME_FORMATTER_CACHE.get(format); - + DateTimeFormatter formatter; + try { + formatter = DATETIME_FORMATTER_CACHE.get(format); + } catch (IllegalArgumentException e) { + return null; + } try { TemporalAccessor accessor = formatter.parse(dateStr); // Precision is hardcoded to match signature of TO_TIMESTAMP diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategyTest.java new file mode 100644 index 00000000000..27331bb7e6c --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategyTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.inference.TypeStrategiesTestBase; + +import java.util.stream.Stream; + +/** Tests for {@link ToTimestampLtzTypeStrategy}. */ +class ToTimestampLtzTypeStrategyTest extends TypeStrategiesTestBase { + + @Override + protected Stream<TestSpec> testData() { + return Stream.of( + TestSpec.forStrategy( + "Valid single argument of type <VARCHAR> or <CHAR>", + SpecificTypeStrategies.TO_TIMESTAMP_LTZ) + .inputTypes(DataTypes.STRING()) + .expectDataType(DataTypes.TIMESTAMP_LTZ(3).nullable()), + TestSpec.forStrategy( + "TO_TIMESTAMP_LTZ(<NUMERIC>)", + SpecificTypeStrategies.TO_TIMESTAMP_LTZ) + .inputTypes(DataTypes.BIGINT()) + .expectDataType(DataTypes.TIMESTAMP_LTZ(3).nullable()), + TestSpec.forStrategy( + "Invalid single argument type", + SpecificTypeStrategies.TO_TIMESTAMP_LTZ) + .inputTypes(DataTypes.BOOLEAN()) + .expectErrorMessage( + "Unsupported argument type. When taking 1 argument, TO_TIMESTAMP_LTZ accepts an argument of type <VARCHAR>, <CHAR>, or <NUMERIC>."), + TestSpec.forStrategy( + "TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>)", + SpecificTypeStrategies.TO_TIMESTAMP_LTZ) + .inputTypes(DataTypes.DOUBLE(), DataTypes.INT()) + .expectDataType(DataTypes.TIMESTAMP_LTZ(3).nullable()), + TestSpec.forStrategy( + "Valid two arguments of <VARCHAR> or <CHAR>", + SpecificTypeStrategies.TO_TIMESTAMP_LTZ) + .inputTypes(DataTypes.STRING(), DataTypes.STRING()) + .expectDataType(DataTypes.TIMESTAMP_LTZ(3).nullable()), + TestSpec.forStrategy( + "Invalid second argument when the first argument is <NUMERIC>", + SpecificTypeStrategies.TO_TIMESTAMP_LTZ) + .inputTypes(DataTypes.BIGINT(), DataTypes.STRING()) + .expectErrorMessage( + "Unsupported argument type. TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>) requires the second argument to be <INTEGER>."), + TestSpec.forStrategy( + "Invalid second argument when the first argument is <VARCHAR> or <CHAR>", + SpecificTypeStrategies.TO_TIMESTAMP_LTZ) + .inputTypes(DataTypes.STRING(), DataTypes.FLOAT()) + .expectErrorMessage( + "Unsupported argument type. If the first argument is of type <VARCHAR> or <CHAR>, TO_TIMESTAMP_LTZ requires the second argument to be of type <VARCHAR> or <CHAR>."), + TestSpec.forStrategy( + "Invalid first argument when taking 2 arguments", + SpecificTypeStrategies.TO_TIMESTAMP_LTZ) + .inputTypes(DataTypes.BOOLEAN(), DataTypes.FLOAT()) + .expectErrorMessage( + "Unsupported argument type. When taking 2 arguments, TO_TIMESTAMP_LTZ requires the first argument to be of type <VARCHAR>, <CHAR>, or <NUMERIC>."), + TestSpec.forStrategy( + "Valid three arguments", SpecificTypeStrategies.TO_TIMESTAMP_LTZ) + .inputTypes(DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()) + .expectDataType(DataTypes.TIMESTAMP_LTZ(3).nullable()), + TestSpec.forStrategy( + "Invalid three arguments", SpecificTypeStrategies.TO_TIMESTAMP_LTZ) + .inputTypes(DataTypes.STRING(), DataTypes.INT(), DataTypes.STRING()) + .expectErrorMessage( + "Unsupported argument type. When taking 3 arguments, TO_TIMESTAMP_LTZ requires all three arguments to be of type <VARCHAR> or <CHAR>."), + TestSpec.forStrategy("No arguments", SpecificTypeStrategies.TO_TIMESTAMP_LTZ) + .inputTypes() + .expectErrorMessage( + "Unsupported argument type. TO_TIMESTAMP_LTZ requires 1 to 3 arguments, but 0 were provided."), + TestSpec.forStrategy("Too many arguments", SpecificTypeStrategies.TO_TIMESTAMP_LTZ) + .inputTypes( + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING()) + .expectErrorMessage( + "Unsupported argument type. TO_TIMESTAMP_LTZ requires 1 to 3 arguments, but 4 were provided.")); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java index 2982fb03a6a..007bcb2a8de 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java @@ -285,9 +285,6 @@ public class DirectConvertRule implements CallExpressionConvertRule { BuiltInFunctionDefinitions.UNIX_TIMESTAMP, FlinkSqlOperatorTable.UNIX_TIMESTAMP); definitionSqlOperatorHashMap.put( BuiltInFunctionDefinitions.TO_DATE, FlinkSqlOperatorTable.TO_DATE); - definitionSqlOperatorHashMap.put( - BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, - FlinkSqlOperatorTable.TO_TIMESTAMP_LTZ); definitionSqlOperatorHashMap.put( BuiltInFunctionDefinitions.TO_TIMESTAMP, FlinkSqlOperatorTable.TO_TIMESTAMP); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index a539e1f81b4..0c76c20474f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -810,17 +810,6 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)), SqlFunctionCategory.TIMEDATE); - public static final SqlFunction TO_TIMESTAMP_LTZ = - new SqlFunction( - "TO_TIMESTAMP_LTZ", - SqlKind.OTHER_FUNCTION, - ReturnTypes.cascade( - ReturnTypes.explicit(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3), - SqlTypeTransforms.FORCE_NULLABLE), - null, - OperandTypes.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.INTEGER), - SqlFunctionCategory.TIMEDATE); - public static final SqlFunction TO_DATE = new SqlFunction( "TO_DATE", diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala index df70f7a8c97..752d3b82a0d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala @@ -23,7 +23,6 @@ import org.apache.flink.table.data.binary.{BinaryStringData, BinaryStringDataUti import org.apache.flink.table.functions.SqlLikeUtils import org.apache.flink.table.runtime.functions._ import org.apache.flink.table.runtime.functions.SqlJsonUtils.JsonQueryReturnType -import org.apache.flink.table.types.logical.LogicalTypeRoot import org.apache.flink.table.utils.DateTimeUtils import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange @@ -326,18 +325,6 @@ object BuiltInMethods { classOf[Long], classOf[TimeZone]) - val LONG_TO_TIMESTAMP_LTZ_WITH_PRECISION = - Types.lookupMethod(classOf[DateTimeUtils], "toTimestampData", classOf[Long], classOf[Int]) - - val DOUBLE_TO_TIMESTAMP_LTZ_WITH_PRECISION = - Types.lookupMethod(classOf[DateTimeUtils], "toTimestampData", classOf[Double], classOf[Int]) - - val DECIMAL_TO_TIMESTAMP_LTZ_WITH_PRECISION = Types.lookupMethod( - classOf[DateTimeUtils], - "toTimestampData", - classOf[DecimalData], - classOf[Int]) - val STRING_TO_TIMESTAMP = Types.lookupMethod(classOf[DateTimeUtils], "parseTimestampData", classOf[String]) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala index 53db45fe7c2..deb931c9654 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala @@ -402,27 +402,6 @@ class FunctionGenerator private (tableConfig: ReadableConfig) { addSqlFunction(HASH_CODE, Seq(DECIMAL), new HashCodeCallGen()) - INTEGRAL_TYPES.foreach( - dt => { - addSqlFunctionMethod( - TO_TIMESTAMP_LTZ, - Seq(dt, INTEGER), - BuiltInMethods.LONG_TO_TIMESTAMP_LTZ_WITH_PRECISION) - }) - - FRACTIONAL_TYPES.foreach( - dt => { - addSqlFunctionMethod( - TO_TIMESTAMP_LTZ, - Seq(dt, INTEGER), - BuiltInMethods.DOUBLE_TO_TIMESTAMP_LTZ_WITH_PRECISION) - }) - - addSqlFunctionMethod( - TO_TIMESTAMP_LTZ, - Seq(DECIMAL, INTEGER), - BuiltInMethods.DECIMAL_TO_TIMESTAMP_LTZ_WITH_PRECISION) - INTEGRAL_TYPES.foreach( dt => addSqlFunctionMethod(FROM_UNIXTIME, Seq(dt), BuiltInMethods.FROM_UNIXTIME)) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala index 75f25eb610c..073c9e646fe 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala @@ -21,7 +21,7 @@ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.data.util.DataFormatConverters import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression} import org.apache.flink.table.planner.codegen.CodeGenUtils._ -import org.apache.flink.table.planner.codegen.GenerateUtils.{generateCallIfArgsNotNull, generateCallIfArgsNullable, generateNonNullField, generateNullLiteral, generateStringResultCallIfArgsNotNull} +import org.apache.flink.table.planner.codegen.GenerateUtils._ import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens._ import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable._ import org.apache.flink.table.planner.functions.sql.SqlDefaultOperator diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java index 8913d23c318..fab328c658b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java @@ -18,7 +18,9 @@ package org.apache.flink.table.planner.functions; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.JsonExistsOnError; +import org.apache.flink.table.data.DecimalDataUtils; import org.apache.flink.table.expressions.TimeIntervalUnit; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; @@ -27,6 +29,8 @@ import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.stream.Stream; @@ -34,6 +38,8 @@ import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.BOOLEAN; import static org.apache.flink.table.api.DataTypes.DATE; import static org.apache.flink.table.api.DataTypes.DAY; +import static org.apache.flink.table.api.DataTypes.DOUBLE; +import static org.apache.flink.table.api.DataTypes.FLOAT; import static org.apache.flink.table.api.DataTypes.HOUR; import static org.apache.flink.table.api.DataTypes.INT; import static org.apache.flink.table.api.DataTypes.INTERVAL; @@ -44,7 +50,10 @@ import static org.apache.flink.table.api.DataTypes.TIMESTAMP; import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; +import static org.apache.flink.table.api.Expressions.lit; import static org.apache.flink.table.api.Expressions.temporalOverlaps; +import static org.apache.flink.table.api.Expressions.toTimestampLtz; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal; /** Test time-related built-in functions. */ class TimeFunctionsITCase extends BuiltInFunctionTestBase { @@ -58,7 +67,8 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase { extractTestCases(), temporalOverlapsTestCases(), ceilTestCases(), - floorTestCases()) + floorTestCases(), + toTimestampLtzTestCases()) .flatMap(s -> s); } @@ -804,4 +814,197 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase { .format(TIMESTAMP_FORMATTER), STRING().nullable())); } + + private Stream<TestSetSpec> toTimestampLtzTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ) + .onFieldsWithData( + 100, + 1234, + -100, + DecimalDataUtils.castFrom(-Double.MAX_VALUE, 38, 18), + 100.01) + .andDataTypes( + DOUBLE(), BIGINT(), BIGINT(), DataTypes.DECIMAL(38, 18), FLOAT()) + .testResult( + toTimestampLtz($("f0")), + "TO_TIMESTAMP_LTZ(f0)", + LocalDateTime.of(1970, 1, 1, 0, 0, 0, 100000000) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz($("f1"), literal(3)), + "TO_TIMESTAMP_LTZ(f1, 3)", + LocalDateTime.of(1970, 1, 1, 0, 0, 1, 234000000) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz($("f2"), literal(0)), + "TO_TIMESTAMP_LTZ(f2, 0)", + LocalDateTime.of(1969, 12, 31, 23, 58, 20) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz($("f3"), literal(0)), + "TO_TIMESTAMP_LTZ(-" + Double.MAX_VALUE + ", 0)", + null, + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz($("f4"), literal(3)), + "TO_TIMESTAMP_LTZ(f4, 3)", + LocalDateTime.of(1970, 1, 1, 0, 0, 0, 100000000) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz("2023-01-01 00:00:00"), + "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00')", + LocalDateTime.of(2023, 1, 1, 0, 0, 0) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz("01/01/2023 00:00:00", "dd/MM/yyyy HH:mm:ss"), + "TO_TIMESTAMP_LTZ('01/01/2023 00:00:00', 'dd/MM/yyyy HH:mm:ss')", + LocalDateTime.of(2023, 1, 1, 0, 0, 0) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz("1970-01-01 00:00:00.123456789"), + "TO_TIMESTAMP_LTZ('1970-01-01 00:00:00.123456789')", + LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123000000) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz( + "1970-01-01 00:00:00.12345", "yyyy-MM-dd HH:mm:ss.SSSSS"), + "TO_TIMESTAMP_LTZ('1970-01-01 00:00:00.12345', 'yyyy-MM-dd HH:mm:ss.SSSSS')", + LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123000000) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz("20000202 59:59.1234567", "yyyyMMdd mm:ss.SSSSSSS"), + "TO_TIMESTAMP_LTZ('20000202 59:59.1234567', 'yyyyMMdd mm:ss.SSSSSSS')", + LocalDateTime.of(2000, 2, 2, 0, 59, 59, 123000000) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz("1234567", "SSSSSSS"), + "TO_TIMESTAMP_LTZ('1234567', 'SSSSSSS')", + LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123000000) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz( + "2017-09-15 00:00:00.12345", "yyyy-MM-dd HH:mm:ss.SSS"), + "TO_TIMESTAMP_LTZ('2017-09-15 00:00:00.12345', 'yyyy-MM-dd HH:mm:ss.SSS')", + LocalDateTime.of(2017, 9, 15, 0, 0, 0, 123000000) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz( + "2023-01-01 00:00:00", + "yyyy-MM-dd HH:mm:ss", + "Asia/Shanghai"), + "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 'Asia/Shanghai')", + LocalDateTime.of(2023, 1, 1, 0, 0, 0) + .atZone(ZoneId.of("Asia/Shanghai")) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz("2023-01-01 00:00:00", "yyyy-MM-dd HH:mm:ss", "UTC"), + "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 'UTC')", + LocalDateTime.of(2023, 1, 1, 0, 0, 0) + .atZone(ZoneOffset.UTC) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz( + "01/01/2023 08:00:00", + "dd/MM/yyyy HH:mm:ss", + "America/Los_Angeles"), + "TO_TIMESTAMP_LTZ('01/01/2023 08:00:00', 'dd/MM/yyyy HH:mm:ss', 'America/Los_Angeles')", + LocalDateTime.of(2023, 1, 1, 8, 0, 0) + .atZone(ZoneId.of("America/Los_Angeles")) + .toInstant(), + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz( + "01/01/2023 08:00:00", + literal("yyyy-MM-dd HH:mm:ss"), + literal("un-parsable timezone")), + "TO_TIMESTAMP_LTZ('01/01/2023 08:00:00', 'yyyy-MM-dd HH:mm:ss', 'un-parsable timezone')", + null, + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz( + "01/01/2023 08:00:00", + literal("un-parsable format"), + literal("UTC")), + "TO_TIMESTAMP_LTZ('01/01/2023 08:00:00', 'un-parsable format', 'UTC')", + null, + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz( + "un-parsable timestamp", + literal("yyyy-MM-dd HH:mm:ss"), + literal("UTC")), + "TO_TIMESTAMP_LTZ('un-parsable timestamp', 'yyyy-MM-dd HH:mm:ss', 'UTC')", + null, + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz(lit(123L), lit(null, DataTypes.INT())), + "TO_TIMESTAMP_LTZ(123, CAST(NULL AS INTEGER))", + null, + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz(lit(null, DataTypes.INT()), 3), + "TO_TIMESTAMP_LTZ(123, CAST(NULL AS INTEGER))", + null, + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz(null), + "TO_TIMESTAMP_LTZ(NULL)", + null, + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz(null, "yyyy-MM-dd HH:mm:ss.SSS"), + "TO_TIMESTAMP_LTZ(NULL, 'yyyy-MM-dd HH:mm:ss.SSS')", + null, + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz("1970-01-01 00:00:00.12345", null), + "TO_TIMESTAMP_LTZ('1970-01-01 00:00:00.12345', NULL)", + null, + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz(null, "dd/MM/yyyy HH:mm:ss", "America/Los_Angeles"), + "TO_TIMESTAMP_LTZ(NULL, 'dd/MM/yyyy HH:mm:ss', 'America/Los_Angeles')", + null, + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz("2023-01-01 00:00:00", null, "America/Los_Angeles"), + "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', NULL, 'America/Los_Angeles')", + null, + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz("2023-01-01 00:00:00", "dd/MM/yyyy HH:mm:ss", null), + "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'dd/MM/yyyy HH:mm:ss', NULL)", + null, + TIMESTAMP_LTZ(3).nullable()) + .testResult( + toTimestampLtz(null), + "TO_TIMESTAMP_LTZ(NULL)", + null, + TIMESTAMP_LTZ(3).nullable())); + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.java index 89acd62c5d6..449685ade3c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.java @@ -133,7 +133,7 @@ class TimeTravelTest extends TableTestBase { + " t1 FOR SYSTEM_TIME AS OF TO_TIMESTAMP_LTZ (0, 3)")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Unsupported time travel expression: TO_TIMESTAMP_LTZ(0, 3) for the expression can not be reduced to a constant by Flink."); + "SQL validation failed. Unsupported time travel expression: `TO_TIMESTAMP_LTZ`(0, 3) for the expression can not be reduced to a constant by Flink."); assertThatThrownBy( () -> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala index 4fba6e34a08..351a2d4095c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.expressions import org.apache.flink.table.api._ +import org.apache.flink.table.api.Expressions.toTimestampLtz import org.apache.flink.table.expressions.TimeIntervalUnit import org.apache.flink.table.planner.codegen.CodeGenException import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase @@ -1282,11 +1283,6 @@ class TemporalTypesTest extends ExpressionTestBase { s"TO_TIMESTAMP_LTZ(253402300800000, 3)", "NULL") - // test invalid number of arguments - testExpectedSqlException( - "TO_TIMESTAMP_LTZ(123)", - "Invalid number of arguments to function 'TO_TIMESTAMP_LTZ'. Was expecting 2 arguments") - // invalid precision testExpectedAllApisException( toTimestampLtz(12, 1), @@ -1308,29 +1304,28 @@ class TemporalTypesTest extends ExpressionTestBase { // invalid type for the first input testExpectedSqlException( "TO_TIMESTAMP_LTZ('test_string_type', 0)", - "Cannot apply 'TO_TIMESTAMP_LTZ' to arguments of type" + - " 'TO_TIMESTAMP_LTZ(<CHAR(16)>, <INTEGER>)'. Supported form(s):" + - " 'TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>)'", + "SQL validation failed. Invalid function call:\n" + + "TO_TIMESTAMP_LTZ(CHAR(16) NOT NULL, INT NOT NULL)", classOf[ValidationException] ) + testExpectedTableApiException( toTimestampLtz("test_string_type", 0), - "Unsupported argument type. " + - "Expected type of family 'NUMERIC' but actual type was 'CHAR(16) NOT NULL'" + "Invalid function call:\n" + + "TO_TIMESTAMP_LTZ(CHAR(16) NOT NULL, INT NOT NULL)" ) // invalid type for the second input testExpectedSqlException( "TO_TIMESTAMP_LTZ(123, 'test_string_type')", - "Cannot apply 'TO_TIMESTAMP_LTZ' to arguments of type" + - " 'TO_TIMESTAMP_LTZ(<INTEGER>, <CHAR(16)>)'. Supported form(s):" + - " 'TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>)'" + "SQL validation failed. Invalid function call:\n" + + "TO_TIMESTAMP_LTZ(INT NOT NULL, CHAR(16) NOT NULL)" ) testExpectedTableApiException( toTimestampLtz(123, "test_string_type"), - "Unsupported argument type. " + - "Expected type of family 'INTEGER_NUMERIC' but actual type was 'CHAR(16) NOT NULL'" + "Invalid function call:\n" + + "TO_TIMESTAMP_LTZ(INT NOT NULL, CHAR(16) NOT NULL)" ) } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java new file mode 100644 index 00000000000..e1b8478c210 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * <p>A function that converts various time formats to TIMESTAMP_LTZ type. + * + * <p>Supported function signatures: + * + * <ul> + * <li>{@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) <br> + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * <li>{@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) <br> + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * <li>{@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) <br> + * Parses string timestamp using default format 'yyyy-MM-dd HH:mm:ss' + * <li>{@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) <br> + * Parses string timestamp using input string of format + * <li>{@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) <br> + * Parses string timestamp using input strings of format and timezone + * </ul> + * + * <p>Example: + * + * <pre>{@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', 'yyyy-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', 'yyyy-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * }</pre> + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + + private static final int DEFAULT_PRECISION = 3; + + public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); + } + + public @Nullable TimestampData eval(Number epoch, Integer precision) { + if (epoch == null || precision == null) { + return null; + } + if (epoch instanceof Float || epoch instanceof Double) { + return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); + } + return DateTimeUtils.toTimestampData(epoch.longValue(), precision); + } + + public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { + if (epoch == null || precision == null) { + return null; + } + + return DateTimeUtils.toTimestampData(epoch, precision); + } + + public @Nullable TimestampData eval(Number epoch) { + return eval(epoch, DEFAULT_PRECISION); + } + + public @Nullable TimestampData eval(DecimalData epoch) { + return eval(epoch, DEFAULT_PRECISION); + } + + public @Nullable TimestampData eval(StringData timestamp) { + if (timestamp == null) { + return null; + } + + return parseTimestampData(timestamp.toString()); + } + + public @Nullable TimestampData eval(StringData timestamp, StringData format) { + if (timestamp == null || format == null) { + return null; + } + + return parseTimestampData(timestamp.toString(), format.toString()); + } + + public @Nullable TimestampData eval( + StringData dateStr, StringData format, StringData timezone) { + if (dateStr == null || format == null || timezone == null) { + return null; + } + + TimestampData ts = parseTimestampData(dateStr.toString(), format.toString()); + if (ts == null) { + return null; + } + + try { + ZonedDateTime zoneDate = ts.toLocalDateTime().atZone(ZoneId.of(timezone.toString())); + return TimestampData.fromInstant(zoneDate.toInstant()); + } catch (DateTimeException e) { + return null; + } + } +}