This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b5a1575f4b4 [FLINK-39666][table] Fix edge-case bugs in 
TO_TIMESTAMP_LTZ precision handling
b5a1575f4b4 is described below

commit b5a1575f4b4e23ae334600c8b06a98e63eab6d2c
Author: Ramin Gharib <[email protected]>
AuthorDate: Mon May 18 12:19:35 2026 +0200

    [FLINK-39666][table] Fix edge-case bugs in TO_TIMESTAMP_LTZ precision 
handling
    
    This closes #28146.
---
 docs/data/sql_functions.yml                        |  12 +-
 docs/data/sql_functions_zh.yml                     |  12 +-
 flink-python/pyflink/table/expressions.py          |  21 +++-
 .../org/apache/flink/table/api/Expressions.java    |  89 +++++++++++---
 .../table/expressions/ValueLiteralExpression.java  |  16 ++-
 .../strategies/ToTimestampLtzTypeStrategy.java     |  61 +++++++---
 .../apache/flink/table/utils/DateTimeUtils.java    |  62 +++++++---
 .../flink/table/expressions/ExpressionTest.java    |  10 +-
 .../planner/functions/TimeFunctionsITCase.java     | 129 +++++++++++++++++++++
 9 files changed, 349 insertions(+), 63 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 53a1ad5dd18..8fcdb58d5c1 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -697,7 +697,12 @@ temporal:
 
       Other precision values between 0 and 9 are also supported, where the 
numeric value represents units of 10^(-precision) seconds.
       The output type is TIMESTAMP_LTZ(3) for precision 0-3, and 
TIMESTAMP_LTZ(precision) for precision 4-9.
-      Returns NULL if any input is NULL.
+      Returns NULL if any input is NULL. Throws a runtime error if precision 
is outside [0, 9].
+
+      Note: the output type is determined at plan time. If precision is 
supplied as a non-literal
+      expression (e.g., a column reference), the precision cannot be inspected 
at plan time and the
+      output defaults to TIMESTAMP_LTZ(3). The runtime still interprets the 
numeric input using the
+      row value of precision, but any sub-millisecond digits are truncated to 
fit the declared type.
 
       E.g., TO_TIMESTAMP_LTZ(1234567890, 0) returns TIMESTAMP_LTZ(3) 
'2009-02-13 23:31:30.000',
       TO_TIMESTAMP_LTZ(1234567890123, 3) returns TIMESTAMP_LTZ(3) '2009-02-13 
23:31:30.123',
@@ -711,7 +716,10 @@ temporal:
       - string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The 
pattern follows Java's DateTimeFormatter syntax, where 'S' represents 
fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSSSSS' for nanoseconds).
       - string3: the time zone of the input string (default 'UTC'). Supports 
zone IDs such as 'UTC', 'Asia/Shanghai', or 'America/Los_Angeles'.
 
-      The output precision is inferred from the number of 'S' characters in 
the format pattern, with a minimum of 3. E.g., format 'yyyy-MM-dd HH:mm:ss.SS' 
returns TIMESTAMP_LTZ(3), format 'yyyy-MM-dd HH:mm:ss.SSSSSS' returns 
TIMESTAMP_LTZ(6).
+      The output precision is inferred from the longest run of 'S' characters 
in the format pattern (outside quoted literal sections), clamped to [3, 9]. 
E.g., format 'yyyy-MM-dd HH:mm:ss.SS' returns TIMESTAMP_LTZ(3), format 
'yyyy-MM-dd HH:mm:ss.SSSSSS' returns TIMESTAMP_LTZ(6), format 'yyyy-MM-dd 
HH:mm:ss.SSSSSS X' also returns TIMESTAMP_LTZ(6).
+
+      Note: this inference only applies when the format pattern is a literal 
at plan time. If the format is supplied as a non-literal expression (e.g., a 
column reference), the pattern cannot be inspected at plan time and the output 
defaults to TIMESTAMP_LTZ(3). The runtime still parses with the actual row 
pattern, but any sub-millisecond digits are truncated to fit the declared type. 
To get TIMESTAMP_LTZ(6) or TIMESTAMP_LTZ(9), the format argument must appear as 
a literal in the SQL text.
+
       Returns NULL if any input is NULL.
 
       E.g., TO_TIMESTAMP_LTZ('2023-01-01 00:00:00') parses using default 
format and UTC,
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 12a952bd968..d0824284818 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -823,7 +823,12 @@ temporal:
 
       Other precision values between 0 and 9 are also supported, where the 
numeric value represents units of 10^(-precision) seconds.
       The output type is TIMESTAMP_LTZ(3) for precision 0-3, and 
TIMESTAMP_LTZ(precision) for precision 4-9.
-      Returns NULL if any input is NULL.
+      Returns NULL if any input is NULL. Throws a runtime error if precision 
is outside [0, 9].
+
+      Note: the output type is determined at plan time. If precision is 
supplied as a non-literal
+      expression (e.g., a column reference), the precision cannot be inspected 
at plan time and the
+      output defaults to TIMESTAMP_LTZ(3). The runtime still interprets the 
numeric input using the
+      row value of precision, but any sub-millisecond digits are truncated to 
fit the declared type.
 
       E.g., TO_TIMESTAMP_LTZ(1234567890, 0) returns TIMESTAMP_LTZ(3) 
'2009-02-13 23:31:30.000',
       TO_TIMESTAMP_LTZ(1234567890123, 3) returns TIMESTAMP_LTZ(3) '2009-02-13 
23:31:30.123',
@@ -837,7 +842,10 @@ temporal:
       - string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The 
pattern follows Java's DateTimeFormatter syntax, where 'S' represents 
fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSSSSS' for nanoseconds).
       - string3: the time zone of the input string (default 'UTC'). Supports 
zone IDs such as 'UTC', 'Asia/Shanghai', or 'America/Los_Angeles'.
 
-      The output precision is inferred from the number of 'S' characters in 
the format pattern, with a minimum of 3. E.g., format 'yyyy-MM-dd HH:mm:ss.SS' 
returns TIMESTAMP_LTZ(3), format 'yyyy-MM-dd HH:mm:ss.SSSSSS' returns 
TIMESTAMP_LTZ(6).
+      The output precision is inferred from the longest run of 'S' characters 
in the format pattern (outside quoted literal sections), clamped to [3, 9]. 
E.g., format 'yyyy-MM-dd HH:mm:ss.SS' returns TIMESTAMP_LTZ(3), format 
'yyyy-MM-dd HH:mm:ss.SSSSSS' returns TIMESTAMP_LTZ(6), format 'yyyy-MM-dd 
HH:mm:ss.SSSSSS X' also returns TIMESTAMP_LTZ(6).
+
+      Note: this inference only applies when the format pattern is a literal 
at plan time. If the format is supplied as a non-literal expression (e.g., a 
column reference), the pattern cannot be inspected at plan time and the output 
defaults to TIMESTAMP_LTZ(3). The runtime still parses with the actual row 
pattern, but any sub-millisecond digits are truncated to fit the declared type. 
To get TIMESTAMP_LTZ(6) or TIMESTAMP_LTZ(9), the format argument must appear as 
a literal in the SQL text.
+
       Returns NULL if any input is NULL.
 
       E.g., TO_TIMESTAMP_LTZ('2023-01-01 00:00:00') parses using default 
format and UTC,
diff --git a/flink-python/pyflink/table/expressions.py 
b/flink-python/pyflink/table/expressions.py
index 7f0fe4956c9..b7c48954ba7 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -346,19 +346,30 @@ def to_timestamp_ltz(*args) -> Expression:
        value represents units of 10^(-precision) seconds. The output type is
        TIMESTAMP_LTZ(3) for precision 0-3, and TIMESTAMP_LTZ(precision) for 
4-9.
 
+       The output type is determined at plan time. If precision is supplied as 
a
+       non-literal expression (e.g., a column reference), the output defaults 
to
+       TIMESTAMP_LTZ(3) and any sub-millisecond digits are truncated to fit the
+       declared type.
+
     3. to_timestamp_ltz(string) -> TIMESTAMP_LTZ(3)
        Parses a timestamp string using default format 'yyyy-MM-dd HH:mm:ss'.
 
-    4. to_timestamp_ltz(string, format) -> 
TIMESTAMP_LTZ(max(fractional_digits, 3))
+    4. to_timestamp_ltz(string, format) -> TIMESTAMP_LTZ(precision)
        Parses a timestamp string using the given format pattern. The output 
precision
-       is inferred from the number of 'S' characters in the format (e.g., 
'SSS' -> 3,
-       'SSSSSS' -> 6, 'SSSSSSSSS' -> 9), with a minimum of 3.
+       is inferred from the longest run of 'S' characters in the format pattern
+       (outside quoted literal sections), clamped to [3, 9]. E.g., 'SSS' -> 3,
+       'SSSSSS' -> 6, 'SSSSSSSSS' -> 9, 'SSSSSS X' -> 6.
+
+       This inference only applies when the format pattern is a literal at 
plan time.
+       If the format is supplied as a non-literal expression (e.g., a column
+       reference), the output defaults to TIMESTAMP_LTZ(3) and any 
sub-millisecond
+       digits are truncated to fit the declared type.
 
-    5. to_timestamp_ltz(string, format, timezone) -> 
TIMESTAMP_LTZ(max(fractional_digits, 3))
+    5. to_timestamp_ltz(string, format, timezone) -> TIMESTAMP_LTZ(precision)
        Parses a timestamp string using the given format pattern in the 
specified time zone.
        The output precision is inferred from the format pattern as in 
signature 4.
 
-    Returns NULL if any input is NULL.
+    Returns NULL if any input is NULL. Throws a runtime error if precision is 
outside [0, 9].
 
     Example:
     ::
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index a4552b25f5e..a2a9544fb35 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -366,15 +366,32 @@ public final class Expressions {
     /**
      * Converts a numeric type epoch time to {@link 
DataTypes#TIMESTAMP_LTZ(int)}.
      *
-     * <p>The supported precision is 0 or 3:
+     * <p>The supported precision is between 0 and 9 inclusive. It determines 
the unit of the
+     * numeric value:
      *
      * <ul>
-     *   <li>0 means the numericEpochTime is in second.
-     *   <li>3 means the numericEpochTime is in millisecond.
+     *   <li>0: seconds since epoch
+     *   <li>3: milliseconds since epoch (default)
+     *   <li>6: microseconds since epoch
+     *   <li>9: nanoseconds since epoch
      * </ul>
      *
+     * <p>Other values in {@code [0, 9]} represent units of {@code 
10^(-precision)} seconds. The
+     * output type is {@code TIMESTAMP_LTZ(3)} for precision 0-3, and {@code
+     * TIMESTAMP_LTZ(precision)} for precision 4-9. When precision is supplied 
as a non-literal
+     * expression the output defaults to {@code TIMESTAMP_LTZ(3)} and 
sub-millisecond digits are
+     * truncated.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * toTimestampLtz(1234567890L, 0)             // epoch seconds
+     * toTimestampLtz(1234567890123L, 3)          // epoch milliseconds
+     * toTimestampLtz(1234567890123456789L, 9)    // epoch nanoseconds
+     * }</pre>
+     *
      * @param numericEpochTime The epoch time with numeric type.
-     * @param precision The precision to indicate the epoch time is in second 
or millisecond.
+     * @param precision The precision (0-9) that determines the unit of the 
numeric value.
      * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} 
type.
      */
     public static ApiExpression toTimestampLtz(Object numericEpochTime, Object 
precision) {
@@ -383,10 +400,26 @@ public final class Expressions {
 
     /**
      * Converts the given time string with the specified format to {@link
-     * DataTypes#TIMESTAMP_LTZ(int)}.
+     * DataTypes#TIMESTAMP_LTZ(int)} under the {@code UTC} time zone.
+     *
+     * <p>The output precision is inferred from the longest run of {@code 'S'} 
characters in the
+     * format pattern (outside quoted literal sections), clamped to {@code [3, 
9]}. For example,
+     * {@code 'yyyy-MM-dd HH:mm:ss.SSS'} produces {@code TIMESTAMP_LTZ(3)}, 
{@code 'yyyy-MM-dd
+     * HH:mm:ss.SSSSSS X'} produces {@code TIMESTAMP_LTZ(6)}.
+     *
+     * <p>This inference only applies when the format is a literal at plan 
time. If the format is
+     * supplied as a non-literal expression, the output defaults to {@code 
TIMESTAMP_LTZ(3)} and
+     * sub-millisecond digits are truncated.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * // Output type TIMESTAMP_LTZ(6), inferred from the run of 6 'S' 
characters.
+     * toTimestampLtz("2023-01-01 00:00:00.123456 Z", "yyyy-MM-dd 
HH:mm:ss.SSSSSS X")
+     * }</pre>
      *
      * @param timestampStr The timestamp string to convert.
-     * @param format The format of the string.
+     * @param format The format pattern to parse the timestamp string.
      * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} 
type.
      */
     public static ApiExpression toTimestampLtz(String timestampStr, String 
format) {
@@ -394,41 +427,59 @@ public final class Expressions {
     }
 
     /**
-     * Converts a timestamp to {@link DataTypes#TIMESTAMP_LTZ(int)}.
+     * Converts the given timestamp string to {@link 
DataTypes#TIMESTAMP_LTZ(int)} using the default
+     * format {@code 'yyyy-MM-dd HH:mm:ss'} under the {@code UTC} time zone.
+     *
+     * <p>Example:
      *
-     * <p>This method takes a string representing a timestamp and converts it 
to a TIMESTAMP_LTZ
-     * using the built-in TO_TIMESTAMP_LTZ function definition.
+     * <pre>{@code
+     * toTimestampLtz("2023-01-01 00:00:00")
+     * }</pre>
      *
      * @param timeStamp The timestamp string to be converted.
-     * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} 
type.
+     * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} 
type at precision 3.
      */
     public static ApiExpression toTimestampLtz(String timeStamp) {
         return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, timeStamp);
     }
 
     /**
-     * Converts a numeric type epoch time to {@link 
DataTypes#TIMESTAMP_LTZ(int)}.
+     * Converts a numeric epoch time in milliseconds to {@link 
DataTypes#TIMESTAMP_LTZ(int)}.
      *
-     * <p>This method takes an object representing an epoch time and converts 
it to a TIMESTAMP_LTZ
-     * using the built-in TO_TIMESTAMP_LTZ function definition.
+     * <p>Equivalent to calling {@link #toTimestampLtz(Object, Object)} with 
precision {@code 3}.
      *
-     * @param numericEpochTime The epoch time with numeric type.
-     * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} 
type.
+     * <p>Example:
+     *
+     * <pre>{@code
+     * toTimestampLtz(1234567890123L)   // epoch milliseconds
+     * }</pre>
+     *
+     * @param numericEpochTime The epoch time in milliseconds.
+     * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} 
type at precision 3.
      */
     public static ApiExpression toTimestampLtz(Object numericEpochTime) {
         return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, 
numericEpochTime);
     }
 
     /**
-     * Converts a string timestamp with the custom format and timezone to 
{@link
+     * Converts the given time string with the specified format and timezone 
to {@link
      * DataTypes#TIMESTAMP_LTZ(int)}.
      *
-     * <p>The timestamp string will be parsed using the custom format and 
timezone, and converted to
-     * a TIMESTAMP_LTZ value.
+     * <p>The output precision is inferred from the longest run of {@code 'S'} 
characters in the
+     * format pattern (outside quoted literal sections), clamped to {@code [3, 
9]}. This inference
+     * only applies when the format is a literal at plan time; for non-literal 
formats the output
+     * defaults to {@code TIMESTAMP_LTZ(3)} and sub-millisecond digits are 
truncated.
+     *
+     * <p>Example:
+     *
+     * <pre>{@code
+     * toTimestampLtz("2023-01-01 08:00:00", "yyyy-MM-dd HH:mm:ss", 
"Asia/Shanghai")
+     * }</pre>
      *
      * @param timestampStr The timestamp string to convert.
      * @param format The format pattern to parse the timestamp string.
-     * @param timezone The timezone to use for the conversion.
+     * @param timezone The timezone to use for the conversion (e.g. {@code 
'UTC'}, {@code
+     *     'Asia/Shanghai'}).
      * @return The timestamp value with {@link DataTypes#TIMESTAMP_LTZ(int)} 
type.
      */
     public static ApiExpression toTimestampLtz(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
index 7a76a36a37c..36adeb8a2d6 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/ValueLiteralExpression.java
@@ -462,11 +462,19 @@ public final class ValueLiteralExpression implements 
ResolvedExpression {
 
     /**
      * Checks whether an {@link Instant} can be represented as a {@code long} 
epoch value at the
-     * given precision without overflow.
+     * given precision without overflow. Accounts for the sub-second nanos 
term that {@link
+     * DateTimeUtils#toEpochValue} adds on top of {@code epochSeconds * 
10^precision}.
      */
     private static boolean canRepresentAsLong(Instant instant, int precision) {
-        long factor = (long) Math.pow(10, precision);
-        long epochSeconds = instant.getEpochSecond();
-        return factor == 0 || Math.abs(epochSeconds) <= Long.MAX_VALUE / 
factor;
+        final long factor = (long) Math.pow(10, precision);
+        final long nanoDivisor = (long) Math.pow(10, 9 - precision);
+        final long epochSeconds = instant.getEpochSecond();
+        final long nanoPart = instant.getNano() / nanoDivisor;
+        if (epochSeconds > Long.MAX_VALUE / factor || epochSeconds < 
Long.MIN_VALUE / factor) {
+            return false;
+        }
+        final long base = epochSeconds * factor;
+        // Negative base + non-negative nanoPart moves toward zero, can't 
overflow upward.
+        return base < 0 || nanoPart <= Long.MAX_VALUE - base;
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
index 7481879b670..38cdf4ef23d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampLtzTypeStrategy.java
@@ -40,6 +40,10 @@ public class ToTimestampLtzTypeStrategy implements 
TypeStrategy {
     private static final int MAX_PRECISION = 9;
     private static final int DEFAULT_PRECISION = 3;
 
+    private static final int EPOCH_OR_TIMESTAMP_ARG = 0;
+    private static final int PRECISION_OR_FORMAT_ARG = 1;
+    private static final int TIMEZONE_ARG = 2;
+
     @Override
     public Optional<DataType> inferType(CallContext callContext) {
         List<DataType> argumentTypes = callContext.getArgumentDataTypes();
@@ -53,7 +57,7 @@ public class ToTimestampLtzTypeStrategy implements 
TypeStrategy {
                             + " were provided.");
         }
 
-        LogicalType firstType = argumentTypes.get(0).getLogicalType();
+        LogicalType firstType = 
argumentTypes.get(EPOCH_OR_TIMESTAMP_ARG).getLogicalType();
         LogicalTypeRoot firstTypeRoot = firstType.getTypeRoot();
         int outputPrecision = DEFAULT_PRECISION;
 
@@ -66,7 +70,8 @@ public class ToTimestampLtzTypeStrategy implements 
TypeStrategy {
                 }
                 break;
             case 2:
-                LogicalType secondType = argumentTypes.get(1).getLogicalType();
+                LogicalType secondType =
+                        
argumentTypes.get(PRECISION_OR_FORMAT_ARG).getLogicalType();
                 LogicalTypeRoot secondTypeRoot = secondType.getTypeRoot();
                 if (firstType.is(LogicalTypeFamily.NUMERIC)) {
                     if (secondTypeRoot != LogicalTypeRoot.INTEGER) {
@@ -74,12 +79,7 @@ public class ToTimestampLtzTypeStrategy implements 
TypeStrategy {
                                 "Unsupported argument type. "
                                         + "TO_TIMESTAMP_LTZ(<NUMERIC>, 
<INTEGER>) requires the second argument to be <INTEGER>.");
                     }
-                    Optional<Integer> precisionOpt = 
callContext.getArgumentValue(1, Integer.class);
-                    if (precisionOpt.isPresent()) {
-                        int precision = precisionOpt.get();
-                        validatePrecision(precision);
-                        outputPrecision = Math.max(precision, 
DEFAULT_PRECISION);
-                    }
+                    outputPrecision = 
inferPrecisionFromIntegerArg(callContext);
                 } else if (isCharacterType(firstTypeRoot)) {
                     if (!isCharacterType(secondTypeRoot)) {
                         throw new ValidationException(
@@ -95,8 +95,13 @@ public class ToTimestampLtzTypeStrategy implements 
TypeStrategy {
                 break;
             case 3:
                 if (!isCharacterType(firstTypeRoot)
-                        || 
!isCharacterType(argumentTypes.get(1).getLogicalType().getTypeRoot())
-                        || 
!isCharacterType(argumentTypes.get(2).getLogicalType().getTypeRoot())) {
+                        || !isCharacterType(
+                                argumentTypes
+                                        .get(PRECISION_OR_FORMAT_ARG)
+                                        .getLogicalType()
+                                        .getTypeRoot())
+                        || !isCharacterType(
+                                
argumentTypes.get(TIMEZONE_ARG).getLogicalType().getTypeRoot())) {
                     throw new ValidationException(
                             "Unsupported argument type. "
                                     + "When taking 3 arguments, 
TO_TIMESTAMP_LTZ requires all three arguments to be of type <VARCHAR> or 
<CHAR>.");
@@ -108,15 +113,45 @@ public class ToTimestampLtzTypeStrategy implements 
TypeStrategy {
     }
 
     /**
-     * Infers the output precision from a format string literal. Returns at 
least {@link
+     * Infers the output precision from a precision integer literal.
+     *
+     * <p>Same plan-time literal constraint as {@link 
#inferPrecisionFromFormat(CallContext)}: when
+     * the precision argument is a non-literal expression, the output defaults 
to {@link
      * #DEFAULT_PRECISION}.
+     *
+     * @return precision in [{@link #DEFAULT_PRECISION}, {@link 
#MAX_PRECISION}]
+     */
+    private static int inferPrecisionFromIntegerArg(CallContext callContext) {
+        if (!callContext.isArgumentLiteral(PRECISION_OR_FORMAT_ARG)) {
+            return DEFAULT_PRECISION;
+        }
+        return callContext
+                .getArgumentValue(PRECISION_OR_FORMAT_ARG, Integer.class)
+                .map(
+                        precision -> {
+                            validatePrecision(precision);
+                            return Math.max(precision, DEFAULT_PRECISION);
+                        })
+                .orElse(DEFAULT_PRECISION);
+    }
+
+    /**
+     * Infers the output precision from a format string literal.
+     *
+     * <p>The output type must be deterministic at plan time, so this method 
can only inspect the
+     * format pattern when it is a literal. When the format is a non-literal 
expression (e.g., a
+     * column reference) the pattern is unknown until runtime and could vary 
per row, so we fall
+     * back to {@link #DEFAULT_PRECISION}. The runtime still parses with the 
actual row pattern, but
+     * any sub-millisecond digits are truncated by the implicit cast to the 
declared type.
+     *
+     * @return precision in [{@link #DEFAULT_PRECISION}, {@link 
#MAX_PRECISION}]
      */
     private static int inferPrecisionFromFormat(CallContext callContext) {
-        if (!callContext.isArgumentLiteral(1)) {
+        if (!callContext.isArgumentLiteral(PRECISION_OR_FORMAT_ARG)) {
             return DEFAULT_PRECISION;
         }
         return callContext
-                .getArgumentValue(1, String.class)
+                .getArgumentValue(PRECISION_OR_FORMAT_ARG, String.class)
                 .map(DateTimeUtils::precisionFromFormat)
                 .orElse(DEFAULT_PRECISION);
     }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
index 9c0071b6bd9..8a161873e21 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.utils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableRuntimeException;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -131,13 +132,13 @@ public class DateTimeUtils {
     private static final long MIN_EPOCH_MILLS = -62167219200000L;
 
     /** The valid minimum epoch seconds ('0000-01-01 00:00:00 UTC+0'). */
-    private static final long MIN_EPOCH_SECONDS = -62167219200L;
+    public static final long MIN_EPOCH_SECONDS = -62167219200L;
 
     /** The valid maximum epoch milliseconds ('9999-12-31 23:59:59.999 
UTC+0'). */
     private static final long MAX_EPOCH_MILLS = 253402300799999L;
 
     /** The valid maximum epoch seconds ('9999-12-31 23:59:59 UTC+0'). */
-    private static final long MAX_EPOCH_SECONDS = 253402300799L;
+    public static final long MAX_EPOCH_SECONDS = 253402300799L;
 
     private static final DateTimeFormatter DEFAULT_TIMESTAMP_FORMATTER =
             new DateTimeFormatterBuilder()
@@ -149,6 +150,8 @@ public class DateTimeUtils {
                     .toFormatter();
 
     private static final int DEFAULT_PRECISION = 3;
+    private static final int MIN_PRECISION = 0;
+    private static final int MAX_PRECISION = 9;
 
     /**
      * A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat 
is not thread-safe.
@@ -337,26 +340,30 @@ public class DateTimeUtils {
      * Converts a numeric epoch value to {@link TimestampData}. The precision 
specifies the unit of
      * the epoch value: 0 for seconds, 3 for milliseconds, 6 for microseconds, 
9 for nanoseconds,
      * and any value in between. Returns {@code null} if the value is out of 
the valid timestamp
-     * range.
+     * range. Throws a {@link TableRuntimeException} if the precision is 
outside {@code [0, 9]}.
      */
     public static TimestampData toTimestampData(long epoch, int precision) {
         return epochToTimestampData(epoch, precision);
     }
 
     /**
-     * See {@link #toTimestampData(long, int)}. The double value is first 
converted to nanoseconds
-     * to preserve fractional parts, then processed at nanosecond precision. 
Returns {@code null} if
-     * the value is out of the valid timestamp range.
+     * See {@link #toTimestampData(long, int)}. Seconds and the sub-second 
remainder are computed
+     * separately so that values near the upper bound of the valid range are 
not lost to long
+     * overflow when scaled to nanoseconds. Returns {@code null} if the value 
is out of the valid
+     * timestamp range. Throws a {@link TableRuntimeException} if the 
precision is outside {@code
+     * [0, 9]}.
      */
     public static TimestampData toTimestampData(double epoch, int precision) {
+        validatePrecision(precision);
         double factor = Math.pow(10, precision);
         double epochSeconds = epoch / factor;
         if (epochSeconds < MIN_EPOCH_SECONDS || epochSeconds > 
MAX_EPOCH_SECONDS) {
             return null;
         }
-        double nanoFactor = Math.pow(10, 9 - precision);
-        long epochNanos = (long) (epoch * nanoFactor);
-        return epochToTimestampData(epochNanos, 9);
+        long seconds = (long) epochSeconds;
+        double fractionalSeconds = epochSeconds - seconds;
+        long nanoAdjustment = (long) (fractionalSeconds * 1_000_000_000L);
+        return TimestampData.fromInstant(Instant.ofEpochSecond(seconds, 
nanoAdjustment));
     }
 
     /** See {@link #toTimestampData(long, int)}. The decimal value is 
truncated to a long. */
@@ -366,6 +373,7 @@ public class DateTimeUtils {
     }
 
     private static TimestampData epochToTimestampData(long epoch, int 
precision) {
+        validatePrecision(precision);
         long factor = (long) Math.pow(10, precision);
         long epochSeconds = Math.floorDiv(epoch, factor);
 
@@ -380,6 +388,15 @@ public class DateTimeUtils {
         return TimestampData.fromInstant(Instant.ofEpochSecond(epochSeconds, 
nanoAdjustment));
     }
 
+    private static void validatePrecision(int precision) {
+        if (precision < MIN_PRECISION || precision > MAX_PRECISION) {
+            throw new TableRuntimeException(
+                    String.format(
+                            "Precision for TO_TIMESTAMP_LTZ must be between %d 
and %d but was %d.",
+                            MIN_PRECISION, MAX_PRECISION, precision));
+        }
+    }
+
     /**
      * Converts an {@link Instant} to an epoch value at the given precision. 
This is the inverse of
      * {@link #toTimestampData(long, int)}.
@@ -391,18 +408,29 @@ public class DateTimeUtils {
     }
 
     /**
-     * Infers fractional second precision from a format pattern by counting 
trailing 'S' characters.
-     * Returns at least {@link #DEFAULT_PRECISION} (3) and at most 9.
+     * Infers fractional second precision from a format pattern by finding the 
longest run of 'S'
+     * characters outside quoted literal sections. Returns at least {@link 
#DEFAULT_PRECISION} (3)
+     * and at most 9.
      */
     public static int precisionFromFormat(String format) {
-        int sCount = 0;
-        for (int i = format.length() - 1; i >= 0; i--) {
-            if (format.charAt(i) != 'S') {
-                break;
+        int maxRun = 0;
+        int run = 0;
+        boolean inQuotes = false;
+        for (int i = 0; i < format.length(); i++) {
+            char c = format.charAt(i);
+            if (c == '\'') {
+                inQuotes = !inQuotes;
+                maxRun = Math.max(maxRun, run);
+                run = 0;
+            } else if (!inQuotes && c == 'S') {
+                run++;
+            } else {
+                maxRun = Math.max(maxRun, run);
+                run = 0;
             }
-            sCount++;
         }
-        return Math.max(Math.min(sCount, 9), DEFAULT_PRECISION);
+        maxRun = Math.max(maxRun, run);
+        return Math.max(Math.min(maxRun, 9), DEFAULT_PRECISION);
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/ExpressionTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/ExpressionTest.java
index be87034542b..f39d36fba58 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/ExpressionTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/expressions/ExpressionTest.java
@@ -355,6 +355,14 @@ class ExpressionTest {
                 Arguments.of(
                         Instant.parse("2262-04-12T00:00:00Z"),
                         9,
-                        "TO_TIMESTAMP_LTZ('2262-04-12 00:00:00.000000000', 
'yyyy-MM-dd HH:mm:ss.SSSSSSSSS', 'UTC')"));
+                        "TO_TIMESTAMP_LTZ('2262-04-12 00:00:00.000000000', 
'yyyy-MM-dd HH:mm:ss.SSSSSSSSS', 'UTC')"),
+                Arguments.of(
+                        Instant.ofEpochSecond(9223372036L, 900_000_000),
+                        9,
+                        "TO_TIMESTAMP_LTZ('2262-04-11 23:47:16.900000000', 
'yyyy-MM-dd HH:mm:ss.SSSSSSSSS', 'UTC')"),
+                Arguments.of(
+                        Instant.ofEpochSecond(-9223372036L, 0),
+                        9,
+                        "TO_TIMESTAMP_LTZ(-9223372036000000000, 9)"));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
index d606dc15313..520c960ff44 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.JsonExistsOnError;
 import org.apache.flink.table.data.DecimalDataUtils;
 import org.apache.flink.table.expressions.TimeIntervalUnit;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.utils.DateTimeUtils;
 
 import java.time.Duration;
 import java.time.Instant;
@@ -1050,6 +1051,134 @@ class TimeFunctionsITCase extends 
BuiltInFunctionTestBase {
                                 toTimestampLtz(null),
                                 "TO_TIMESTAMP_LTZ(NULL)",
                                 null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "2023-01-01 00:00:00.123456 Z",
+                                        "yyyy-MM-dd HH:mm:ss.SSSSSS X"),
+                                "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00.123456 
Z', 'yyyy-MM-dd HH:mm:ss.SSSSSS X')",
+                                LocalDateTime.of(2023, 1, 1, 0, 0, 0, 
123_456_000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(6).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "2023-01-01 00:00:00.123456789 Z",
+                                        "yyyy-MM-dd HH:mm:ss.SSSSSSSSS X"),
+                                "TO_TIMESTAMP_LTZ('2023-01-01 
00:00:00.123456789 Z', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS X')",
+                                LocalDateTime.of(2023, 1, 1, 0, 0, 0, 
123_456_789)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(9).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "2023-01-01 00:00:00.1234Z", 
"yyyy-MM-dd HH:mm:ss.SSSS'Z'"),
+                                "TO_TIMESTAMP_LTZ('2023-01-01 00:00:00.1234Z', 
'yyyy-MM-dd HH:mm:ss.SSSS''Z''')",
+                                LocalDateTime.of(2023, 1, 1, 0, 0, 0, 
123_400_000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(4).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "2023-01-01 00:00:00.1234'+0000'",
+                                        "yyyy-MM-dd HH:mm:ss.SSSS''Z''"),
+                                "TO_TIMESTAMP_LTZ('2023-01-01 
00:00:00.1234''+0000''', 'yyyy-MM-dd HH:mm:ss.SSSS''''Z''''')",
+                                LocalDateTime.of(2023, 1, 1, 0, 0, 0, 
123_400_000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(4).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "2023-01-01T00:00:00.123456",
+                                        "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"),
+                                
"TO_TIMESTAMP_LTZ('2023-01-01T00:00:00.123456', 
'yyyy-MM-dd''T''HH:mm:ss.SSSSSS')",
+                                LocalDateTime.of(2023, 1, 1, 0, 0, 0, 
123_456_000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(6).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "2023-01-01T00:00:00.123456789Z",
+                                        "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSX"),
+                                
"TO_TIMESTAMP_LTZ('2023-01-01T00:00:00.123456789Z', 
'yyyy-MM-dd''T''HH:mm:ss.SSSSSSSSSX')",
+                                LocalDateTime.of(2023, 1, 1, 0, 0, 0, 
123_456_789)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(9).nullable())
+                        .testResult(
+                                toTimestampLtz(
+                                        "2023-01-01T00:00:00.123456789Z",
+                                        "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS'Z'"),
+                                
"TO_TIMESTAMP_LTZ('2023-01-01T00:00:00.123456789Z', 
'yyyy-MM-dd''T''HH:mm:ss.SSSSSSSSS''Z''')",
+                                LocalDateTime.of(2023, 1, 1, 0, 0, 0, 
123_456_789)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(9).nullable()),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ)
+                        .onFieldsWithData("yyyy-MM-dd HH:mm:ss.SSSSSSSSS")
+                        .andDataTypes(STRING())
+                        .testResult(
+                                toTimestampLtz("2023-01-01 
00:00:00.123456789", $("f0")),
+                                "TO_TIMESTAMP_LTZ('2023-01-01 
00:00:00.123456789', f0)",
+                                LocalDateTime.of(2023, 1, 1, 0, 0, 0, 
123_000_000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable()),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ)
+                        .onFieldsWithData(-1, 3, 6, 10, 1234567L)
+                        .andDataTypes(INT(), INT(), INT(), INT(), BIGINT())
+                        .testTableApiRuntimeError(
+                                toTimestampLtz($("f4"), $("f0")),
+                                "Precision for TO_TIMESTAMP_LTZ must be 
between 0 and 9 but was -1.")
+                        .testSqlRuntimeError(
+                                "TO_TIMESTAMP_LTZ(f4, f0)",
+                                "Precision for TO_TIMESTAMP_LTZ must be 
between 0 and 9 but was -1.")
+                        .testResult(
+                                toTimestampLtz($("f4"), $("f1")),
+                                "TO_TIMESTAMP_LTZ(f4, f1)",
+                                LocalDateTime.of(1970, 1, 1, 0, 20, 34, 
567_000_000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz($("f4"), $("f2")),
+                                "TO_TIMESTAMP_LTZ(f4, f2)",
+                                LocalDateTime.of(1970, 1, 1, 0, 0, 1, 
234_000_000)
+                                        .atZone(ZoneOffset.UTC)
+                                        .toInstant(),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testTableApiRuntimeError(
+                                toTimestampLtz($("f4"), $("f3")),
+                                "Precision for TO_TIMESTAMP_LTZ must be 
between 0 and 9 but was 10.")
+                        .testSqlRuntimeError(
+                                "TO_TIMESTAMP_LTZ(f4, f3)",
+                                "Precision for TO_TIMESTAMP_LTZ must be 
between 0 and 9 but was 10."),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ)
+                        .onFieldsWithData(
+                                (double) DateTimeUtils.MAX_EPOCH_SECONDS,
+                                (double) DateTimeUtils.MIN_EPOCH_SECONDS,
+                                Double.MAX_VALUE,
+                                -Double.MAX_VALUE)
+                        .andDataTypes(DOUBLE(), DOUBLE(), DOUBLE(), DOUBLE())
+                        .testResult(
+                                toTimestampLtz($("f0"), literal(0)),
+                                "TO_TIMESTAMP_LTZ(f0, 0)",
+                                
Instant.ofEpochSecond(DateTimeUtils.MAX_EPOCH_SECONDS),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz($("f1"), literal(0)),
+                                "TO_TIMESTAMP_LTZ(f1, 0)",
+                                
Instant.ofEpochSecond(DateTimeUtils.MIN_EPOCH_SECONDS),
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz($("f2"), literal(0)),
+                                "TO_TIMESTAMP_LTZ(f2, 0)",
+                                null,
+                                TIMESTAMP_LTZ(3).nullable())
+                        .testResult(
+                                toTimestampLtz($("f3"), literal(0)),
+                                "TO_TIMESTAMP_LTZ(f3, 0)",
+                                null,
                                 TIMESTAMP_LTZ(3).nullable()));
     }
 }


Reply via email to