This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c730292d645 [FLINK-36862][table] Implement additional 
`TO_TIMESTAMP_LTZ` functions
c730292d645 is described below

commit c730292d645fc7ea24659f791eb8599b2c849408
Author: Yiyu Tian <120529776+yiyuti...@users.noreply.github.com>
AuthorDate: Sun Jan 12 12:30:43 2025 -0800

    [FLINK-36862][table] Implement additional `TO_TIMESTAMP_LTZ` functions
---
 docs/data/sql_functions.yml                        |   7 +-
 docs/data/sql_functions_zh.yml                     |  11 +-
 flink-python/pyflink/table/expressions.py          |  48 ++++-
 .../pyflink/table/tests/test_expression.py         |   9 +-
 .../org/apache/flink/table/api/Expressions.java    |  55 ++++++
 .../table/api/ImplicitExpressionConversions.scala  |  11 --
 .../functions/BuiltInFunctionDefinitions.java      |  21 ++-
 .../strategies/ToTimestampLtzTypeStrategy.java     |  65 ++++++-
 .../apache/flink/table/utils/DateTimeUtils.java    |  10 +-
 .../strategies/ToTimestampLtzTypeStrategyTest.java |  98 ++++++++++
 .../expressions/converter/DirectConvertRule.java   |   3 -
 .../functions/sql/FlinkSqlOperatorTable.java       |  11 --
 .../planner/codegen/calls/BuiltInMethods.scala     |  13 --
 .../planner/codegen/calls/FunctionGenerator.scala  |  21 ---
 .../planner/codegen/calls/StringCallGen.scala      |   2 +-
 .../planner/functions/TimeFunctionsITCase.java     | 205 ++++++++++++++++++++-
 .../planner/plan/batch/sql/TimeTravelTest.java     |   2 +-
 .../planner/expressions/TemporalTypesTest.scala    |  25 +--
 .../functions/scalar/ToTimestampLtzFunction.java   | 139 ++++++++++++++
 19 files changed, 650 insertions(+), 106 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index eda6d3de753..2ff7353ebd2 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -679,9 +679,12 @@ temporal:
   - sql: TO_DATE(string1[, string2])
     table: toDate(STRING1[, STRING2])
     description: Converts a date string string1 with format string2 (by 
default 'yyyy-MM-dd') to a date.
-  - sql: TO_TIMESTAMP_LTZ(numeric, precision)
+  - sql: TO_TIMESTAMP_LTZ(numeric[, precision])
     table: toTimestampLtz(NUMERIC, PRECISION)
-    description: "Converts a epoch seconds or epoch milliseconds to a 
TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents 
TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents 
TO_TIMESTAMP_LTZ(epochMilliseconds, 3)."
+    description: Converts an epoch seconds or epoch milliseconds to a 
TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents 
TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents 
TO_TIMESTAMP_LTZ(epochMilliseconds, 3). If no precision is provided, the 
default precision is 3. If any input is null, the function will return null.
+  - sql: TO_TIMESTAMP_LTZ(string1[, string2[, string3]])
+    table: toTimestampLtz(STRING1[, STRING2[, STRING3]])
+    description: Converts a timestamp string string1 with format string2 (by 
default 'yyyy-MM-dd HH:mm:ss.SSS') in time zone string3 (by default 'UTC') to a 
TIMESTAMP_LTZ. If any input is null, the function will return null.
   - sql: TO_TIMESTAMP(string1[, string2])
     table: toTimestamp(STRING1[, STRING2])
     description: "Converts date time string string1 with format string2 (by 
default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone."
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index d0250acc6f6..06f5ce34ef9 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -805,11 +805,12 @@ temporal:
   - sql: TO_DATE(string1[, string2])
     table: toDate(STRING1[, STRING2])
     description: 将格式为 string2(默认为 'yyyy-MM-dd')的字符串 string1 转换为日期。
-  - sql: TO_TIMESTAMP_LTZ(numeric, precision)
-    table: toTimestampLtz(numeric, PRECISION)
-    description: |
-      将纪元秒或纪元毫秒转换为 TIMESTAMP_LTZ,有效精度为 0 或 3,0 代表 
`TO_TIMESTAMP_LTZ(epochSeconds, 0)`,
-      3 代表` TO_TIMESTAMP_LTZ(epochMilliseconds, 3)`。
+  - sql: TO_TIMESTAMP_LTZ(numeric[, precision])
+    table: toTimestampLtz(NUMERIC, PRECISION)
+    description: Converts an epoch seconds or epoch milliseconds to a 
TIMESTAMP_LTZ, the valid precision is 0 or 3, the 0 represents 
TO_TIMESTAMP_LTZ(epochSeconds, 0), the 3 represents 
TO_TIMESTAMP_LTZ(epochMilliseconds, 3). If no precision is provided, the 
default precision is 3. If any input is null, the function will return null.
+  - sql: TO_TIMESTAMP_LTZ(string1[, string2[, string3]])
+    table: toTimestampLtz(STRING1[, STRING2[, STRING3]])
+    description: Converts a timestamp string string1 with format string2 (by 
default 'yyyy-MM-dd HH:mm:ss.SSS') in time zone string3 (by default 'UTC') to a 
TIMESTAMP_LTZ. If any input is null, the function will return null.
   - sql: TO_TIMESTAMP(string1[, string2])
     table: toTimestamp(STRING1[, STRING2])
     description: 将格式为 string2(默认为:'yyyy-MM-dd HH:mm:ss')的字符串 string1 转换为 
timestamp,不带时区。
diff --git a/flink-python/pyflink/table/expressions.py 
b/flink-python/pyflink/table/expressions.py
index 3b326cb4641..0e73fc78369 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -306,19 +306,47 @@ def to_timestamp(timestamp_str: Union[str, 
Expression[str]],
         return _binary_op("toTimestamp", timestamp_str, format)
 
 
-def to_timestamp_ltz(numeric_epoch_time, precision) -> Expression:
-    """
-    Converts a numeric type epoch time to TIMESTAMP_LTZ.
+def to_timestamp_ltz(*args) -> Expression:
+    """
+    Converts a value to a timestamp with local time zone.
+
+    Supported functions:
+    1. to_timestamp_ltz(Numeric) -> DataTypes.TIMESTAMP_LTZ
+    Converts a numeric value of epoch milliseconds to a TIMESTAMP_LTZ. The 
default precision is 3.
+    2. to_timestamp_ltz(Numeric, Integer) -> DataTypes.TIMESTAMP_LTZ
+    Converts a numeric value of epoch seconds or epoch milliseconds to a 
TIMESTAMP_LTZ.
+    Valid precisions are 0 or 3.
+    3. to_timestamp_ltz(String) -> DataTypes.TIMESTAMP_LTZ
+    Converts a timestamp string using default format 'yyyy-MM-dd HH:mm:ss.SSS' 
to a TIMESTAMP_LTZ.
+    4. to_timestamp_ltz(String, String) -> DataTypes.TIMESTAMP_LTZ
+    Converts a timestamp string using format (default 'yyyy-MM-dd 
HH:mm:ss.SSS') to a TIMESTAMP_LTZ.
+    5. to_timestamp_ltz(String, String, String) -> DataTypes.TIMESTAMP_LTZ
+    Converts a timestamp string string1 using format string2 (default 
'yyyy-MM-dd HH:mm:ss.SSS')
+    in time zone string3 (default 'UTC') to a TIMESTAMP_LTZ.
+    Supports any timezone that is available in Java's TimeZone database.
 
-    The supported precision is 0 or 3:
-    0 means the numericEpochTime is in second.
-    3 means the numericEpochTime is in millisecond.
+    Example:
+    ::
 
-    :param numeric_epoch_time: The epoch time with numeric type
-    :param precision: The precision to indicate the epoch time is in second or 
millisecond
-    :return: The timestamp value with TIMESTAMP_LTZ type.
+        >>> table.select(to_timestamp_ltz(100))  # numeric with default 
precision
+        >>> table.select(to_timestamp_ltz(100, 0))  # numeric with second 
precision
+        >>> table.select(to_timestamp_ltz(100, 3))  # numeric with millisecond 
precision
+        >>> table.select(to_timestamp_ltz("2023-01-01 00:00:00"))  # string 
with default format
+        >>> table.select(to_timestamp_ltz("01/01/2023", "MM/dd/yyyy"))  # 
string with format
+        >>> table.select(to_timestamp_ltz("2023-01-01 00:00:00",
+                                        "yyyy-MM-dd HH:mm:ss",
+                                        "UTC"))  # string with format and 
timezone
     """
-    return _binary_op("toTimestampLtz", numeric_epoch_time, precision)
+    if len(args) == 1:
+        return _unary_op("toTimestampLtz", lit(args[0]))
+
+    # For two arguments case (numeric + precision or string + format)
+    elif len(args) == 2:
+        return _binary_op("toTimestampLtz", lit(args[0]), lit(args[1]))
+
+    # For three arguments case (string + format + timezone)
+    else:
+        return _ternary_op("toTimestampLtz", lit(args[0]), lit(args[1]), 
lit(args[2]))
 
 
 def temporal_overlaps(left_time_point,
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index 2ea4bbdffbd..ef5a763f31e 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -285,7 +285,14 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         self.assertEqual("toDate('2018-03-18')", str(to_date('2018-03-18')))
         self.assertEqual("toDate('2018-03-18', 'yyyy-MM-dd')",
                          str(to_date('2018-03-18', 'yyyy-MM-dd')))
-        self.assertEqual('toTimestampLtz(123, 0)', str(to_timestamp_ltz(123, 
0)))
+        self.assertEqual('TO_TIMESTAMP_LTZ(100)', str(to_timestamp_ltz(100)))
+        self.assertEqual("TO_TIMESTAMP_LTZ('2023-01-01 00:00:00')",
+                         str(to_timestamp_ltz('2023-01-01 00:00:00')))
+        self.assertEqual("TO_TIMESTAMP_LTZ('01/01/2023 00:00:00', 'MM/dd/yyyy 
HH:mm:ss')",
+                         str(to_timestamp_ltz("01/01/2023 00:00:00", 
"MM/dd/yyyy HH:mm:ss")))
+        self.assertEqual("TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'yyyy-MM-dd 
HH:mm:ss', 'UTC')",
+                         str(to_timestamp_ltz("2023-01-01 00:00:00", 
"yyyy-MM-dd HH:mm:ss", "UTC")))
+        self.assertEqual("TO_TIMESTAMP_LTZ(123, 0)", str(to_timestamp_ltz(123, 
0)))
         self.assertEqual("toTimestamp('1970-01-01 08:01:40')",
                          str(to_timestamp('1970-01-01 08:01:40')))
         self.assertEqual("toTimestamp('1970-01-01 08:01:40', 'yyyy-MM-dd 
HH:mm:ss')",
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 9b687ae65a0..84365f5f5ed 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -366,6 +366,61 @@ public final class Expressions {
         return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, 
numericEpochTime, precision);
     }
 
+    /**
+     * Converts the given time string with the specified format to {@link
+     * DataTypes#TIMESTAMP_LTZ(int)}.
+     *
+     * @param timestampStr The timestamp string to convert.
+     * @param format The format of the string.
+     * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} 
type.
+     */
+    public static ApiExpression toTimestampLtz(String timestampStr, String 
format) {
+        return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, 
timestampStr, format);
+    }
+
+    /**
+     * Converts a timestamp to {@link DataTypes#TIMESTAMP_LTZ(int)}.
+     *
+     * <p>This method takes a string representing a timestamp and converts it 
to a TIMESTAMP_LTZ
+     * using the built-in TO_TIMESTAMP_LTZ function definition.
+     *
+     * @param timeStamp The timestamp string to be converted.
+     * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} 
type.
+     */
+    public static ApiExpression toTimestampLtz(String timeStamp) {
+        return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, timeStamp);
+    }
+
+    /**
+     * Converts a numeric type epoch time to {@link 
DataTypes#TIMESTAMP_LTZ(int)}.
+     *
+     * <p>This method takes an object representing an epoch time and converts 
it to a TIMESTAMP_LTZ
+     * using the built-in TO_TIMESTAMP_LTZ function definition.
+     *
+     * @param numericEpochTime The epoch time with numeric type.
+     * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} 
type.
+     */
+    public static ApiExpression toTimestampLtz(Object numericEpochTime) {
+        return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, 
numericEpochTime);
+    }
+
+    /**
+     * Converts a string timestamp with the custom format and timezone to 
{@link
+     * DataTypes#TIMESTAMP_LTZ(int)}.
+     *
+     * <p>The timestamp string will be parsed using the custom format and 
timezone, and converted to
+     * a TIMESTAMP_LTZ value.
+     *
+     * @param timestampStr The timestamp string to convert.
+     * @param format The format pattern to parse the timestamp string.
+     * @param timezone The timezone to use for the conversion.
+     * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} 
type.
+     */
+    public static ApiExpression toTimestampLtz(
+            Object timestampStr, Object format, Object timezone) {
+        return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, 
timestampStr, format, timezone);
+    }
+
     /**
      * Determines whether two anchored time intervals overlap. Time point and 
temporal are
      * transformed into a range defined by two time points (start, end). The 
function evaluates
diff --git 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index d889851064a..b1b59ed663a 100644
--- 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++ 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -519,17 +519,6 @@ trait ImplicitExpressionConversions {
     Expressions.toTimestamp(timestampStr, format)
   }
 
-  /**
-   * Converts a numeric type epoch time to [[DataTypes#TIMESTAMP_LTZ]].
-   *
-   * The supported precision is 0 or 3:
-   *   - 0 means the numericEpochTime is in second.
-   *   - 3 means the numericEpochTime is in millisecond.
-   */
-  def toTimestampLtz(numericEpochTime: Expression, precision: Expression): 
Expression = {
-    Expressions.toTimestampLtz(numericEpochTime, precision)
-  }
-
   /**
    * Determines whether two anchored time intervals overlap. Time point and 
temporal are transformed
    * into a range defined by two time points (start, end). The function 
evaluates <code>leftEnd >=
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index ec4e4291089..2bd61bd04cb 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -2332,14 +2332,25 @@ public final class BuiltInFunctionDefinitions {
 
     public static final BuiltInFunctionDefinition TO_TIMESTAMP_LTZ =
             BuiltInFunctionDefinition.newBuilder()
-                    .name("toTimestampLtz")
-                    .sqlName("TO_TIMESTAMP_LTZ")
+                    .name("TO_TIMESTAMP_LTZ")
                     .kind(SCALAR)
                     .inputTypeStrategy(
-                            sequence(
-                                    logical(LogicalTypeFamily.NUMERIC),
-                                    logical(LogicalTypeFamily.INTEGER_NUMERIC, 
false)))
+                            or(
+                                    
sequence(logical(LogicalTypeFamily.CHARACTER_STRING)),
+                                    sequence(
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING)),
+                                    sequence(
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING)),
+                                    
sequence(logical(LogicalTypeFamily.NUMERIC)),
+                                    sequence(
+                                            logical(LogicalTypeFamily.NUMERIC),
+                                            
logical(LogicalTypeFamily.INTEGER_NUMERIC))))
                     
.outputTypeStrategy(SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.ToTimestampLtzFunction")
                     .build();
 
     public static final BuiltInFunctionDefinition TO_TIMESTAMP =
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
index 722dd63e51d..fe11e91e432 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
@@ -20,22 +20,79 @@ package org.apache.flink.table.types.inference.strategies;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
+import java.util.List;
 import java.util.Optional;
 
 /** Type strategy of {@code TO_TIMESTAMP_LTZ}. */
 @Internal
 public class ToTimestampLtzTypeStrategy implements TypeStrategy {
 
+    private static final int DEFAULT_PRECISION = 3;
+
     @Override
     public Optional<DataType> inferType(CallContext callContext) {
-        if (callContext.isArgumentLiteral(1)) {
-            final int precision = callContext.getArgumentValue(1, 
Integer.class).get();
-            return Optional.of(DataTypes.TIMESTAMP_LTZ(precision));
+        List<DataType> argumentTypes = callContext.getArgumentDataTypes();
+        int argCount = argumentTypes.size();
+
+        if (argCount < 1 || argCount > 3) {
+            throw new ValidationException(
+                    "Unsupported argument type. "
+                            + "TO_TIMESTAMP_LTZ requires 1 to 3 arguments, but 
"
+                            + argCount
+                            + " were provided.");
+        }
+
+        LogicalType firstType = argumentTypes.get(0).getLogicalType();
+        LogicalTypeRoot firstTypeRoot = firstType.getTypeRoot();
+
+        if (argCount == 1) {
+            if (!isCharacterType(firstTypeRoot) && 
!firstType.is(LogicalTypeFamily.NUMERIC)) {
+                throw new ValidationException(
+                        "Unsupported argument type. "
+                                + "When taking 1 argument, TO_TIMESTAMP_LTZ 
accepts an argument of type <VARCHAR>, <CHAR>, or <NUMERIC>.");
+            }
+        } else if (argCount == 2) {
+            LogicalType secondType = argumentTypes.get(1).getLogicalType();
+            LogicalTypeRoot secondTypeRoot = secondType.getTypeRoot();
+            if (firstType.is(LogicalTypeFamily.NUMERIC)) {
+                if (secondTypeRoot != LogicalTypeRoot.INTEGER) {
+                    throw new ValidationException(
+                            "Unsupported argument type. "
+                                    + "TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>) 
requires the second argument to be <INTEGER>.");
+                }
+            } else if (isCharacterType(firstTypeRoot)) {
+                if (!isCharacterType(secondTypeRoot)) {
+                    throw new ValidationException(
+                            "Unsupported argument type. "
+                                    + "If the first argument is of type 
<VARCHAR> or <CHAR>, TO_TIMESTAMP_LTZ requires the second argument to be of 
type <VARCHAR> or <CHAR>.");
+                }
+            } else {
+                throw new ValidationException(
+                        "Unsupported argument type. "
+                                + "When taking 2 arguments, TO_TIMESTAMP_LTZ 
requires the first argument to be of type <VARCHAR>, <CHAR>, or <NUMERIC>.");
+            }
+        } else if (argCount == 3) {
+            if (!isCharacterType(firstTypeRoot)
+                    || 
!isCharacterType(argumentTypes.get(1).getLogicalType().getTypeRoot())
+                    || 
!isCharacterType(argumentTypes.get(2).getLogicalType().getTypeRoot())) {
+                throw new ValidationException(
+                        "Unsupported argument type. "
+                                + "When taking 3 arguments, TO_TIMESTAMP_LTZ 
requires all three arguments to be of type <VARCHAR> or <CHAR>.");
+            }
         }
-        return Optional.of(DataTypes.TIMESTAMP_LTZ(3));
+
+        return 
Optional.of(DataTypes.TIMESTAMP_LTZ(DEFAULT_PRECISION).nullable());
+    }
+
+    private boolean isCharacterType(LogicalTypeRoot typeRoot) {
+        return typeRoot == LogicalTypeRoot.CHAR || typeRoot == 
LogicalTypeRoot.VARCHAR;
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
index c8d0f2e7311..4346c437bb8 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
@@ -142,6 +142,8 @@ public class DateTimeUtils {
                     .optionalEnd()
                     .toFormatter();
 
+    private static final Integer DEFAULT_PRECISION = 3;
+
     /**
      * A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat 
is not thread-safe.
      * (string_format) => formatter
@@ -422,8 +424,12 @@ public class DateTimeUtils {
     }
 
     public static TimestampData parseTimestampData(String dateStr, String 
format) {
-        DateTimeFormatter formatter = DATETIME_FORMATTER_CACHE.get(format);
-
+        DateTimeFormatter formatter;
+        try {
+            formatter = DATETIME_FORMATTER_CACHE.get(format);
+        } catch (IllegalArgumentException e) {
+            return null;
+        }
         try {
             TemporalAccessor accessor = formatter.parse(dateStr);
             // Precision is hardcoded to match signature of TO_TIMESTAMP
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategyTest.java
new file mode 100644
index 00000000000..27331bb7e6c
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategyTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.inference.TypeStrategiesTestBase;
+
+import java.util.stream.Stream;
+
+/** Tests for {@link ToTimestampLtzTypeStrategy}. */
+class ToTimestampLtzTypeStrategyTest extends TypeStrategiesTestBase {
+
+    @Override
+    protected Stream<TestSpec> testData() {
+        return Stream.of(
+                TestSpec.forStrategy(
+                                "Valid single argument of type <VARCHAR> or 
<CHAR>",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.STRING())
+                        .expectDataType(DataTypes.TIMESTAMP_LTZ(3).nullable()),
+                TestSpec.forStrategy(
+                                "TO_TIMESTAMP_LTZ(<NUMERIC>)",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.BIGINT())
+                        .expectDataType(DataTypes.TIMESTAMP_LTZ(3).nullable()),
+                TestSpec.forStrategy(
+                                "Invalid single argument type",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.BOOLEAN())
+                        .expectErrorMessage(
+                                "Unsupported argument type. When taking 1 
argument, TO_TIMESTAMP_LTZ accepts an argument of type <VARCHAR>, <CHAR>, or 
<NUMERIC>."),
+                TestSpec.forStrategy(
+                                "TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>)",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.DOUBLE(), DataTypes.INT())
+                        .expectDataType(DataTypes.TIMESTAMP_LTZ(3).nullable()),
+                TestSpec.forStrategy(
+                                "Valid two arguments of <VARCHAR> or <CHAR>",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.STRING(), DataTypes.STRING())
+                        .expectDataType(DataTypes.TIMESTAMP_LTZ(3).nullable()),
+                TestSpec.forStrategy(
+                                "Invalid second argument when the first 
argument is <NUMERIC>",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.BIGINT(), DataTypes.STRING())
+                        .expectErrorMessage(
+                                "Unsupported argument type. 
TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>) requires the second argument to be 
<INTEGER>."),
+                TestSpec.forStrategy(
+                                "Invalid second argument when the first 
argument is <VARCHAR> or <CHAR>",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.STRING(), DataTypes.FLOAT())
+                        .expectErrorMessage(
+                                "Unsupported argument type. If the first 
argument is of type <VARCHAR> or <CHAR>, TO_TIMESTAMP_LTZ requires the second 
argument to be of type <VARCHAR> or <CHAR>."),
+                TestSpec.forStrategy(
+                                "Invalid first argument when taking 2 
arguments",
+                                SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.BOOLEAN(), DataTypes.FLOAT())
+                        .expectErrorMessage(
+                                "Unsupported argument type. When taking 2 
arguments, TO_TIMESTAMP_LTZ requires the first argument to be of type 
<VARCHAR>, <CHAR>, or <NUMERIC>."),
+                TestSpec.forStrategy(
+                                "Valid three arguments", 
SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING())
+                        .expectDataType(DataTypes.TIMESTAMP_LTZ(3).nullable()),
+                TestSpec.forStrategy(
+                                "Invalid three arguments", 
SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(DataTypes.STRING(), DataTypes.INT(), 
DataTypes.STRING())
+                        .expectErrorMessage(
+                                "Unsupported argument type. When taking 3 
arguments, TO_TIMESTAMP_LTZ requires all three arguments to be of type 
<VARCHAR> or <CHAR>."),
+                TestSpec.forStrategy("No arguments", 
SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes()
+                        .expectErrorMessage(
+                                "Unsupported argument type. TO_TIMESTAMP_LTZ 
requires 1 to 3 arguments, but 0 were provided."),
+                TestSpec.forStrategy("Too many arguments", 
SpecificTypeStrategies.TO_TIMESTAMP_LTZ)
+                        .inputTypes(
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING())
+                        .expectErrorMessage(
+                                "Unsupported argument type. TO_TIMESTAMP_LTZ 
requires 1 to 3 arguments, but 4 were provided."));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
index 2982fb03a6a..007bcb2a8de 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
@@ -285,9 +285,6 @@ public class DirectConvertRule implements 
CallExpressionConvertRule {
                 BuiltInFunctionDefinitions.UNIX_TIMESTAMP, 
FlinkSqlOperatorTable.UNIX_TIMESTAMP);
         definitionSqlOperatorHashMap.put(
                 BuiltInFunctionDefinitions.TO_DATE, 
FlinkSqlOperatorTable.TO_DATE);
-        definitionSqlOperatorHashMap.put(
-                BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ,
-                FlinkSqlOperatorTable.TO_TIMESTAMP_LTZ);
         definitionSqlOperatorHashMap.put(
                 BuiltInFunctionDefinitions.TO_TIMESTAMP, 
FlinkSqlOperatorTable.TO_TIMESTAMP);
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index a539e1f81b4..0c76c20474f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -810,17 +810,6 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER)),
                     SqlFunctionCategory.TIMEDATE);
 
-    public static final SqlFunction TO_TIMESTAMP_LTZ =
-            new SqlFunction(
-                    "TO_TIMESTAMP_LTZ",
-                    SqlKind.OTHER_FUNCTION,
-                    ReturnTypes.cascade(
-                            
ReturnTypes.explicit(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3),
-                            SqlTypeTransforms.FORCE_NULLABLE),
-                    null,
-                    OperandTypes.family(SqlTypeFamily.NUMERIC, 
SqlTypeFamily.INTEGER),
-                    SqlFunctionCategory.TIMEDATE);
-
     public static final SqlFunction TO_DATE =
             new SqlFunction(
                     "TO_DATE",
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index df70f7a8c97..752d3b82a0d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -23,7 +23,6 @@ import org.apache.flink.table.data.binary.{BinaryStringData, 
BinaryStringDataUti
 import org.apache.flink.table.functions.SqlLikeUtils
 import org.apache.flink.table.runtime.functions._
 import 
org.apache.flink.table.runtime.functions.SqlJsonUtils.JsonQueryReturnType
-import org.apache.flink.table.types.logical.LogicalTypeRoot
 import org.apache.flink.table.utils.DateTimeUtils
 import org.apache.flink.table.utils.DateTimeUtils.TimeUnitRange
 
@@ -326,18 +325,6 @@ object BuiltInMethods {
     classOf[Long],
     classOf[TimeZone])
 
-  val LONG_TO_TIMESTAMP_LTZ_WITH_PRECISION =
-    Types.lookupMethod(classOf[DateTimeUtils], "toTimestampData", 
classOf[Long], classOf[Int])
-
-  val DOUBLE_TO_TIMESTAMP_LTZ_WITH_PRECISION =
-    Types.lookupMethod(classOf[DateTimeUtils], "toTimestampData", 
classOf[Double], classOf[Int])
-
-  val DECIMAL_TO_TIMESTAMP_LTZ_WITH_PRECISION = Types.lookupMethod(
-    classOf[DateTimeUtils],
-    "toTimestampData",
-    classOf[DecimalData],
-    classOf[Int])
-
   val STRING_TO_TIMESTAMP =
     Types.lookupMethod(classOf[DateTimeUtils], "parseTimestampData", 
classOf[String])
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
index 53db45fe7c2..deb931c9654 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
@@ -402,27 +402,6 @@ class FunctionGenerator private (tableConfig: 
ReadableConfig) {
 
   addSqlFunction(HASH_CODE, Seq(DECIMAL), new HashCodeCallGen())
 
-  INTEGRAL_TYPES.foreach(
-    dt => {
-      addSqlFunctionMethod(
-        TO_TIMESTAMP_LTZ,
-        Seq(dt, INTEGER),
-        BuiltInMethods.LONG_TO_TIMESTAMP_LTZ_WITH_PRECISION)
-    })
-
-  FRACTIONAL_TYPES.foreach(
-    dt => {
-      addSqlFunctionMethod(
-        TO_TIMESTAMP_LTZ,
-        Seq(dt, INTEGER),
-        BuiltInMethods.DOUBLE_TO_TIMESTAMP_LTZ_WITH_PRECISION)
-    })
-
-  addSqlFunctionMethod(
-    TO_TIMESTAMP_LTZ,
-    Seq(DECIMAL, INTEGER),
-    BuiltInMethods.DECIMAL_TO_TIMESTAMP_LTZ_WITH_PRECISION)
-
   INTEGRAL_TYPES.foreach(
     dt => addSqlFunctionMethod(FROM_UNIXTIME, Seq(dt), 
BuiltInMethods.FROM_UNIXTIME))
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index 75f25eb610c..073c9e646fe 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -21,7 +21,7 @@ import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.data.util.DataFormatConverters
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
GeneratedExpression}
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
-import 
org.apache.flink.table.planner.codegen.GenerateUtils.{generateCallIfArgsNotNull,
 generateCallIfArgsNullable, generateNonNullField, generateNullLiteral, 
generateStringResultCallIfArgsNotNull}
+import org.apache.flink.table.planner.codegen.GenerateUtils._
 import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens._
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable._
 import org.apache.flink.table.planner.functions.sql.SqlDefaultOperator
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
index 8913d23c318..fab328c658b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.table.planner.functions;
 
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.JsonExistsOnError;
+import org.apache.flink.table.data.DecimalDataUtils;
 import org.apache.flink.table.expressions.TimeIntervalUnit;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 
@@ -27,6 +29,8 @@ import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.util.stream.Stream;
 
@@ -34,6 +38,8 @@ import static org.apache.flink.table.api.DataTypes.BIGINT;
 import static org.apache.flink.table.api.DataTypes.BOOLEAN;
 import static org.apache.flink.table.api.DataTypes.DATE;
 import static org.apache.flink.table.api.DataTypes.DAY;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
 import static org.apache.flink.table.api.DataTypes.HOUR;
 import static org.apache.flink.table.api.DataTypes.INT;
 import static org.apache.flink.table.api.DataTypes.INTERVAL;
@@ -44,7 +50,10 @@ import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.Expressions.call;
+import static org.apache.flink.table.api.Expressions.lit;
 import static org.apache.flink.table.api.Expressions.temporalOverlaps;
+import static org.apache.flink.table.api.Expressions.toTimestampLtz;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.literal;
 
 /** Test time-related built-in functions. */
 class TimeFunctionsITCase extends BuiltInFunctionTestBase {
@@ -58,7 +67,8 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
                         extractTestCases(),
                         temporalOverlapsTestCases(),
                         ceilTestCases(),
-                        floorTestCases())
+                        floorTestCases(),
+                        toTimestampLtzTestCases())
                 .flatMap(s -> s);
     }
 
@@ -804,4 +814,197 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase 
{
                                         .format(TIMESTAMP_FORMATTER),
                                 STRING().nullable()));
     }
+
+    private Stream<TestSetSpec> toTimestampLtzTestCases() {
+        return Stream.of(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ)
+                        .onFieldsWithData(
+                                100,
+                                1234,
+                                -100,
+                                DecimalDataUtils.castFrom(-Double.MAX_VALUE, 
38, 18),
+                                100.01)
+                        .andDataTypes(
+                                DOUBLE(), BIGINT(), BIGINT(), 
DataTypes.DECIMAL(38, 18), FLOAT())
+                        .testResult(
+                                toTimestampLtz($("f0")),
+                                "TO_TIMESTAMP_LTZ(f0)",
+                                LocalDateTime.of(1970, 1, 1, 0, 0, 0, 
100000000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz($("f1"), literal(3)),
+                                "TO_TIMESTAMP_LTZ(f1, 3)",
+                                LocalDateTime.of(1970, 1, 1, 0, 0, 1, 
234000000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz($("f2"), literal(0)),
+                                "TO_TIMESTAMP_LTZ(f2, 0)",
+                                LocalDateTime.of(1969, 12, 31, 23, 58, 20)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz($("f3"), literal(0)),
+                                "TO_TIMESTAMP_LTZ(-" + Double.MAX_VALUE + ", 
0)",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz($("f4"), literal(3)),
+                                "TO_TIMESTAMP_LTZ(f4, 3)",
+                                LocalDateTime.of(1970, 1, 1, 0, 0, 0, 
100000000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz("2023-01-01 00:00:00"),
+                                "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00')",
+                                LocalDateTime.of(2023, 1, 1, 0, 0, 0)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz("01/01/2023 00:00:00", 
"dd/MM/yyyy HH:mm:ss"),
+                                "TO_TIMESTAMP_LTZ('01/01/2023 00:00:00', 
'dd/MM/yyyy HH:mm:ss')",
+                                LocalDateTime.of(2023, 1, 1, 0, 0, 0)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz("1970-01-01 
00:00:00.123456789"),
+                                "TO_TIMESTAMP_LTZ('1970-01-01 
00:00:00.123456789')",
+                                LocalDateTime.of(1970, 1, 1, 0, 0, 0, 
123000000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "1970-01-01 00:00:00.12345", 
"yyyy-MM-dd HH:mm:ss.SSSSS"),
+                                "TO_TIMESTAMP_LTZ('1970-01-01 00:00:00.12345', 
'yyyy-MM-dd HH:mm:ss.SSSSS')",
+                                LocalDateTime.of(1970, 1, 1, 0, 0, 0, 
123000000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz("20000202 59:59.1234567", 
"yyyyMMdd mm:ss.SSSSSSS"),
+                                "TO_TIMESTAMP_LTZ('20000202 59:59.1234567', 
'yyyyMMdd mm:ss.SSSSSSS')",
+                                LocalDateTime.of(2000, 2, 2, 0, 59, 59, 
123000000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz("1234567", "SSSSSSS"),
+                                "TO_TIMESTAMP_LTZ('1234567', 'SSSSSSS')",
+                                LocalDateTime.of(1970, 1, 1, 0, 0, 0, 
123000000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "2017-09-15 00:00:00.12345", 
"yyyy-MM-dd HH:mm:ss.SSS"),
+                                "TO_TIMESTAMP_LTZ('2017-09-15 00:00:00.12345', 
'yyyy-MM-dd HH:mm:ss.SSS')",
+                                LocalDateTime.of(2017, 9, 15, 0, 0, 0, 
123000000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "2023-01-01 00:00:00",
+                                        "yyyy-MM-dd HH:mm:ss",
+                                        "Asia/Shanghai"),
+                                "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 
'yyyy-MM-dd HH:mm:ss', 'Asia/Shanghai')",
+                                LocalDateTime.of(2023, 1, 1, 0, 0, 0)
+                                        .atZone(ZoneId.of("Asia/Shanghai"))
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz("2023-01-01 00:00:00", 
"yyyy-MM-dd HH:mm:ss", "UTC"),
+                                "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 
'yyyy-MM-dd HH:mm:ss', 'UTC')",
+                                LocalDateTime.of(2023, 1, 1, 0, 0, 0)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "01/01/2023 08:00:00",
+                                        "dd/MM/yyyy HH:mm:ss",
+                                        "America/Los_Angeles"),
+                                "TO_TIMESTAMP_LTZ('01/01/2023 08:00:00', 
'dd/MM/yyyy HH:mm:ss', 'America/Los_Angeles')",
+                                LocalDateTime.of(2023, 1, 1, 8, 0, 0)
+                                        
.atZone(ZoneId.of("America/Los_Angeles"))
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "01/01/2023 08:00:00",
+                                        literal("yyyy-MM-dd HH:mm:ss"),
+                                        literal("un-parsable timezone")),
+                                "TO_TIMESTAMP_LTZ('01/01/2023 08:00:00', 
'yyyy-MM-dd HH:mm:ss', 'un-parsable timezone')",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "01/01/2023 08:00:00",
+                                        literal("un-parsable format"),
+                                        literal("UTC")),
+                                "TO_TIMESTAMP_LTZ('01/01/2023 08:00:00', 
'un-parsable format', 'UTC')",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "un-parsable timestamp",
+                                        literal("yyyy-MM-dd HH:mm:ss"),
+                                        literal("UTC")),
+                                "TO_TIMESTAMP_LTZ('un-parsable timestamp', 
'yyyy-MM-dd HH:mm:ss', 'UTC')",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(lit(123L), lit(null, 
DataTypes.INT())),
+                                "TO_TIMESTAMP_LTZ(123, CAST(NULL AS INTEGER))",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(lit(null, DataTypes.INT()), 3),
+                                "TO_TIMESTAMP_LTZ(123, CAST(NULL AS INTEGER))",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(null),
+                                "TO_TIMESTAMP_LTZ(NULL)",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(null, "yyyy-MM-dd 
HH:mm:ss.SSS"),
+                                "TO_TIMESTAMP_LTZ(NULL, 'yyyy-MM-dd 
HH:mm:ss.SSS')",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz("1970-01-01 00:00:00.12345", 
null),
+                                "TO_TIMESTAMP_LTZ('1970-01-01 00:00:00.12345', 
NULL)",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(null, "dd/MM/yyyy HH:mm:ss", 
"America/Los_Angeles"),
+                                "TO_TIMESTAMP_LTZ(NULL, 'dd/MM/yyyy HH:mm:ss', 
'America/Los_Angeles')",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz("2023-01-01 00:00:00", null, 
"America/Los_Angeles"),
+                                "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', NULL, 
'America/Los_Angeles')",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz("2023-01-01 00:00:00", 
"dd/MM/yyyy HH:mm:ss", null),
+                                "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 
'dd/MM/yyyy HH:mm:ss', NULL)",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(null),
+                                "TO_TIMESTAMP_LTZ(NULL)",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable()));
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.java
index 89acd62c5d6..449685ade3c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/TimeTravelTest.java
@@ -133,7 +133,7 @@ class TimeTravelTest extends TableTestBase {
                                                         + "    t1 FOR 
SYSTEM_TIME AS OF TO_TIMESTAMP_LTZ (0, 3)"))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Unsupported time travel expression: 
TO_TIMESTAMP_LTZ(0, 3) for the expression can not be reduced to a constant by 
Flink.");
+                        "SQL validation failed. Unsupported time travel 
expression: `TO_TIMESTAMP_LTZ`(0, 3) for the expression can not be reduced to a 
constant by Flink.");
 
         assertThatThrownBy(
                         () ->
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index 4fba6e34a08..351a2d4095c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.planner.expressions
 
 import org.apache.flink.table.api._
+import org.apache.flink.table.api.Expressions.toTimestampLtz
 import org.apache.flink.table.expressions.TimeIntervalUnit
 import org.apache.flink.table.planner.codegen.CodeGenException
 import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase
@@ -1282,11 +1283,6 @@ class TemporalTypesTest extends ExpressionTestBase {
       s"TO_TIMESTAMP_LTZ(253402300800000, 3)",
       "NULL")
 
-    // test invalid number of arguments
-    testExpectedSqlException(
-      "TO_TIMESTAMP_LTZ(123)",
-      "Invalid number of arguments to function 'TO_TIMESTAMP_LTZ'. Was 
expecting 2 arguments")
-
     // invalid precision
     testExpectedAllApisException(
       toTimestampLtz(12, 1),
@@ -1308,29 +1304,28 @@ class TemporalTypesTest extends ExpressionTestBase {
     // invalid type for the first input
     testExpectedSqlException(
       "TO_TIMESTAMP_LTZ('test_string_type', 0)",
-      "Cannot apply 'TO_TIMESTAMP_LTZ' to arguments of type" +
-        " 'TO_TIMESTAMP_LTZ(<CHAR(16)>, <INTEGER>)'. Supported form(s):" +
-        " 'TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>)'",
+      "SQL validation failed. Invalid function call:\n" +
+        "TO_TIMESTAMP_LTZ(CHAR(16) NOT NULL, INT NOT NULL)",
       classOf[ValidationException]
     )
+
     testExpectedTableApiException(
       toTimestampLtz("test_string_type", 0),
-      "Unsupported argument type. " +
-        "Expected type of family 'NUMERIC' but actual type was 'CHAR(16) NOT 
NULL'"
+      "Invalid function call:\n" +
+        "TO_TIMESTAMP_LTZ(CHAR(16) NOT NULL, INT NOT NULL)"
     )
 
     // invalid type for the second input
     testExpectedSqlException(
       "TO_TIMESTAMP_LTZ(123, 'test_string_type')",
-      "Cannot apply 'TO_TIMESTAMP_LTZ' to arguments of type" +
-        " 'TO_TIMESTAMP_LTZ(<INTEGER>, <CHAR(16)>)'. Supported form(s):" +
-        " 'TO_TIMESTAMP_LTZ(<NUMERIC>, <INTEGER>)'"
+      "SQL validation failed. Invalid function call:\n" +
+        "TO_TIMESTAMP_LTZ(INT NOT NULL, CHAR(16) NOT NULL)"
     )
 
     testExpectedTableApiException(
       toTimestampLtz(123, "test_string_type"),
-      "Unsupported argument type. " +
-        "Expected type of family 'INTEGER_NUMERIC' but actual type was 
'CHAR(16) NOT NULL'"
+      "Invalid function call:\n" +
+        "TO_TIMESTAMP_LTZ(INT NOT NULL, CHAR(16) NOT NULL)"
     )
   }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java
new file mode 100644
index 00000000000..e1b8478c210
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * <p>A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * <p>Supported function signatures:
+ *
+ * <ul>
+ *   <li>{@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) <br>
+ *       Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   <li>{@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) <br>
+ *       Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *       milliseconds)
+ *   <li>{@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) <br>
+ *       Parses string timestamp using default format 'yyyy-MM-dd HH:mm:ss'
+ *   <li>{@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) <br>
+ *       Parses string timestamp using input string of format
+ *   <li>{@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) <br>
+ *       Parses string timestamp using input strings of format and timezone
+ * </ul>
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0)     // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', 'yyyy-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', 'yyyy-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }</pre>
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+    private static final int DEFAULT_PRECISION = 3;
+
+    public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+        super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+    }
+
+    public @Nullable TimestampData eval(Number epoch, Integer precision) {
+        if (epoch == null || precision == null) {
+            return null;
+        }
+        if (epoch instanceof Float || epoch instanceof Double) {
+            return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+        }
+        return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+    }
+
+    public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+        if (epoch == null || precision == null) {
+            return null;
+        }
+
+        return DateTimeUtils.toTimestampData(epoch, precision);
+    }
+
+    public @Nullable TimestampData eval(Number epoch) {
+        return eval(epoch, DEFAULT_PRECISION);
+    }
+
+    public @Nullable TimestampData eval(DecimalData epoch) {
+        return eval(epoch, DEFAULT_PRECISION);
+    }
+
+    public @Nullable TimestampData eval(StringData timestamp) {
+        if (timestamp == null) {
+            return null;
+        }
+
+        return parseTimestampData(timestamp.toString());
+    }
+
+    public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+        if (timestamp == null || format == null) {
+            return null;
+        }
+
+        return parseTimestampData(timestamp.toString(), format.toString());
+    }
+
+    public @Nullable TimestampData eval(
+            StringData dateStr, StringData format, StringData timezone) {
+        if (dateStr == null || format == null || timezone == null) {
+            return null;
+        }
+
+        TimestampData ts = parseTimestampData(dateStr.toString(), 
format.toString());
+        if (ts == null) {
+            return null;
+        }
+
+        try {
+            ZonedDateTime zoneDate = 
ts.toLocalDateTime().atZone(ZoneId.of(timezone.toString()));
+            return TimestampData.fromInstant(zoneDate.toInstant());
+        } catch (DateTimeException e) {
+            return null;
+        }
+    }
+}

Reply via email to