This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e4d4996551c [FLINK-17224][table] Support precision of `TIME` type
e4d4996551c is described below

commit e4d4996551ce3b097a6c15d7d281a17e3d01be56
Author: Ramin Gharib <[email protected]>
AuthorDate: Mon Sep 1 11:59:20 2025 +0200

    [FLINK-17224][table] Support precision of `TIME` type
    
    Co-Authored-By: Sergey Nuyanzin <[email protected]>
    
    This closes #26954.
---
 .../flink/table/runtime/batch/AvroTypesITCase.java | 18 ++---
 .../json/JsonParserToRowDataConverters.java        |  2 +-
 .../formats/json/JsonToRowDataConverters.java      |  2 +-
 .../formats/json/RowDataToJsonConverters.java      |  2 +-
 .../formats/json/JsonRowDataSerDeSchemaTest.java   | 18 +++--
 .../apache/flink/table/utils/DateTimeUtils.java    |  4 ++
 .../functions/aggfunctions/LeadLagAggFunction.java |  7 +-
 .../functions/aggfunctions/MaxAggFunction.java     |  8 ++-
 .../functions/aggfunctions/MinAggFunction.java     |  8 ++-
 .../aggfunctions/SingleValueAggFunction.java       |  8 ++-
 .../functions/casting/CastRuleProvider.java        |  1 +
 .../functions/casting/TimeToTimeCastRule.java      | 56 +++++++++++++++
 .../table/planner/calcite/FlinkTypeFactory.scala   | 10 +--
 .../planner/codegen/calls/BuiltInMethods.scala     |  3 +
 .../planner/codegen/calls/ScalarOperatorGens.scala | 20 ++++--
 .../planner/plan/utils/AggFunctionFactory.scala    | 12 ++--
 .../planner/functions/CastFunctionITCase.java      | 82 +++++++++++++++++++---
 .../planner/functions/TimeFunctionsITCase.java     | 38 +++++-----
 .../planner/functions/casting/CastRulesTest.java   | 12 ++--
 .../operations/SqlDdlToOperationConverterTest.java | 69 +++++++++---------
 .../plan/batch/sql/MLPredictTableFunctionTest.xml  |  8 +--
 .../plan/stream/sql/MLPredictTableFunctionTest.xml |  8 +--
 .../table/planner/plan/stream/table/ValuesTest.xml |  6 +-
 .../planner/expressions/TemporalTypesTest.scala    |  2 +-
 .../table/runtime/typeutils/TypeCheckUtils.java    |  5 ++
 25 files changed, 284 insertions(+), 125 deletions(-)

diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
index 53853a10ba1..3c97e226980 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -27,12 +27,12 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.test.util.AbstractTestBaseJUnit4;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.CollectionUtil;
 
 import org.apache.avro.util.Utf8;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -51,7 +51,7 @@ import static org.apache.flink.table.api.Expressions.$;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for interoperability with Avro types. */
-public class AvroTypesITCase extends AbstractTestBaseJUnit4 {
+class AvroTypesITCase extends AbstractTestBase {
 
     private static final User USER_1 =
             User.newBuilder()
@@ -79,7 +79,7 @@ public class AvroTypesITCase extends AbstractTestBaseJUnit4 {
                                     .build())
                     .setTypeBytes(ByteBuffer.allocate(10))
                     .setTypeDate(LocalDate.parse("2014-03-01"))
-                    .setTypeTimeMillis(LocalTime.parse("12:12:12"))
+                    .setTypeTimeMillis(LocalTime.parse("12:12:12.345"))
                     
.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
                     
.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
                     .setTypeTimestampMicros(
@@ -152,7 +152,7 @@ public class AvroTypesITCase extends AbstractTestBaseJUnit4 
{
                     .build();
 
     @Test
-    public void testAvroToRow() throws Exception {
+    void testAvroToRow() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
@@ -167,7 +167,7 @@ public class AvroTypesITCase extends AbstractTestBaseJUnit4 
{
         String expected =
                 "+I[Charlie, null, blue, 1337, 1.337, null, false, [], [], 
null, RED, {}, null, null, "
                         + "{\"num\": 42, \"street\": \"Bakerstreet\", 
\"city\": \"Berlin\", \"state\": \"Berlin\", \"zip\": \"12049\"}, "
-                        + "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], 
2014-03-01, 12:12:12, 00:00:00.123456, 2014-03-01T12:12:12.321Z, "
+                        + "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], 
2014-03-01, 12:12:12.345, 00:00:00.123456, 2014-03-01T12:12:12.321Z, "
                         + "1970-01-01T00:00:00.123456Z, 
java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]\n"
                         + "+I[Whatever, null, black, 42, 0.0, null, true, 
[hello], [true], null, GREEN, {}, "
                         + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 
null, null, "
@@ -181,7 +181,7 @@ public class AvroTypesITCase extends AbstractTestBaseJUnit4 
{
     }
 
     @Test
-    public void testAvroStringAccess() {
+    void testAvroStringAccess() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
@@ -198,7 +198,7 @@ public class AvroTypesITCase extends AbstractTestBaseJUnit4 
{
     }
 
     @Test
-    public void testAvroObjectAccess() throws Exception {
+    void testAvroObjectAccess() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
@@ -217,7 +217,7 @@ public class AvroTypesITCase extends AbstractTestBaseJUnit4 
{
     }
 
     @Test
-    public void testAvroToAvro() throws Exception {
+    void testAvroToAvro() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
index 6556502f0d4..ea5ced4c905 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
@@ -266,7 +266,7 @@ public class JsonParserToRowDataConverters implements 
Serializable {
         LocalTime localTime = parsedTime.query(TemporalQueries.localTime());
 
         // get number of milliseconds of the day
-        return localTime.toSecondOfDay() * 1000;
+        return (int) (localTime.toNanoOfDay() / 1000_000);
     }
 
     private TimestampData convertToTimestamp(JsonParser jp) throws IOException 
{
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
index 5da937e43c7..653b8b3621b 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
@@ -214,7 +214,7 @@ public class JsonToRowDataConverters implements 
Serializable {
         LocalTime localTime = parsedTime.query(TemporalQueries.localTime());
 
         // get number of milliseconds of the day
-        return localTime.toSecondOfDay() * 1000;
+        return (int) (localTime.toNanoOfDay() / 1000_000);
     }
 
     private TimestampData convertToTimestamp(JsonNode jsonNode) {
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
index 2c64c89cd8d..11ab01a35e8 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
@@ -177,7 +177,7 @@ public class RowDataToJsonConverters implements 
Serializable {
     private RowDataToJsonConverter createTimeConverter() {
         return (mapper, reuse, value) -> {
             int millisecond = (int) value;
-            LocalTime time = LocalTime.ofSecondOfDay(millisecond / 1000L);
+            LocalTime time = LocalTime.ofNanoOfDay(millisecond * 1000_000L);
             return 
mapper.getNodeFactory().textNode(SQL_TIME_FORMAT.format(time));
         };
     }
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index 94f59dbba36..0012ede7344 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -118,6 +118,7 @@ public class JsonRowDataSerDeSchemaTest {
         Double[] doubles = new Double[] {1.1, 2.2, 3.3};
         LocalDate date = LocalDate.parse("1990-10-14");
         LocalTime time = LocalTime.parse("12:12:43");
+        LocalTime time3 = LocalTime.parse("12:12:43.123");
         Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123");
         Timestamp timestamp9 = Timestamp.valueOf("1990-10-14 
12:12:43.123456789");
         Instant timestampWithLocalZone =
@@ -152,6 +153,7 @@ public class JsonRowDataSerDeSchemaTest {
         root.set("doubles", doubleNode);
         root.put("date", "1990-10-14");
         root.put("time", "12:12:43");
+        root.put("time3", "12:12:43.123");
         root.put("timestamp3", "1990-10-14T12:12:43.123");
         root.put("timestamp9", "1990-10-14T12:12:43.123456789");
         root.put("timestampWithLocalZone", "1990-10-14T12:12:43.123456789Z");
@@ -175,6 +177,7 @@ public class JsonRowDataSerDeSchemaTest {
                         FIELD("doubles", ARRAY(DOUBLE())),
                         FIELD("date", DATE()),
                         FIELD("time", TIME(0)),
+                        FIELD("time3", TIME(3)),
                         FIELD("timestamp3", TIMESTAMP(3)),
                         FIELD("timestamp9", TIMESTAMP(9)),
                         FIELD("timestampWithLocalZone", 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)),
@@ -188,7 +191,7 @@ public class JsonRowDataSerDeSchemaTest {
                         isJsonParser, schema, false, false, 
TimestampFormat.ISO_8601);
         open(deserializationSchema);
 
-        Row expected = new Row(18);
+        Row expected = new Row(19);
         expected.setField(0, true);
         expected.setField(1, tinyint);
         expected.setField(2, smallint);
@@ -201,12 +204,13 @@ public class JsonRowDataSerDeSchemaTest {
         expected.setField(9, doubles);
         expected.setField(10, date);
         expected.setField(11, time);
-        expected.setField(12, timestamp3.toLocalDateTime());
-        expected.setField(13, timestamp9.toLocalDateTime());
-        expected.setField(14, timestampWithLocalZone);
-        expected.setField(15, map);
-        expected.setField(16, multiSet);
-        expected.setField(17, nestedMap);
+        expected.setField(12, time3);
+        expected.setField(13, timestamp3.toLocalDateTime());
+        expected.setField(14, timestamp9.toLocalDateTime());
+        expected.setField(15, timestampWithLocalZone);
+        expected.setField(16, map);
+        expected.setField(17, multiSet);
+        expected.setField(18, nestedMap);
 
         RowData rowData = deserializationSchema.deserialize(serializedJson);
         Row actual = convertToExternal(rowData, dataType);
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
index aaf2b075266..5f9eb01f57a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
@@ -1698,6 +1698,10 @@ public class DateTimeUtils {
         }
     }
 
+    public static int truncate(int time, int precision) {
+        return (int) zeroLastDigits(time, 3 - precision);
+    }
+
     private static long zeroLastDigits(long l, int n) {
         long tenToTheN = (long) Math.pow(10, n);
         return (l / tenToTheN) * tenToTheN;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LeadLagAggFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LeadLagAggFunction.java
index 9d4ffde91ca..aefddea9ba2 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LeadLagAggFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LeadLagAggFunction.java
@@ -264,13 +264,16 @@ public abstract class LeadLagAggFunction extends 
DeclarativeAggregateFunction {
     /** TimeLeadLagAggFunction. */
     public static class TimeLeadLagAggFunction extends LeadLagAggFunction {
 
-        public TimeLeadLagAggFunction(int operandCount) {
+        private final TimeType type;
+
+        public TimeLeadLagAggFunction(TimeType type, int operandCount) {
             super(operandCount);
+            this.type = type;
         }
 
         @Override
         public DataType getResultType() {
-            return DataTypes.TIME(TimeType.DEFAULT_PRECISION);
+            return DataTypes.TIME(type.getPrecision());
         }
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxAggFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxAggFunction.java
index a2de172610f..5d09664442d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxAggFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxAggFunction.java
@@ -191,9 +191,15 @@ public abstract class MaxAggFunction extends 
DeclarativeAggregateFunction {
 
     /** Built-in Time Max aggregate function. */
     public static class TimeMaxAggFunction extends MaxAggFunction {
+        private final TimeType type;
+
+        public TimeMaxAggFunction(TimeType type) {
+            this.type = type;
+        }
+
         @Override
         public DataType getResultType() {
-            return DataTypes.TIME(TimeType.DEFAULT_PRECISION);
+            return DataTypes.TIME(type.getPrecision());
         }
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MinAggFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MinAggFunction.java
index 19ee48d233f..80a6090a819 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MinAggFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MinAggFunction.java
@@ -185,9 +185,15 @@ public abstract class MinAggFunction extends 
DeclarativeAggregateFunction {
 
     /** Built-in Time Min aggregate function. */
     public static class TimeMinAggFunction extends MinAggFunction {
+        private final TimeType type;
+
+        public TimeMinAggFunction(TimeType type) {
+            this.type = type;
+        }
+
         @Override
         public DataType getResultType() {
-            return DataTypes.TIME(TimeType.DEFAULT_PRECISION);
+            return DataTypes.TIME(type.getPrecision());
         }
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
index b6b983bd12b..d1cd7b0e20b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
@@ -272,9 +272,15 @@ public abstract class SingleValueAggFunction extends 
DeclarativeAggregateFunctio
 
         private static final long serialVersionUID = 320495723666949978L;
 
+        private final TimeType type;
+
+        public TimeSingleValueAggFunction(TimeType type) {
+            this.type = type;
+        }
+
         @Override
         public DataType getResultType() {
-            return DataTypes.TIME(TimeType.DEFAULT_PRECISION);
+            return DataTypes.TIME(type.getPrecision());
         }
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
index b1a050cd744..7b94816696e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
@@ -87,6 +87,7 @@ public class CastRuleProvider {
                 .addRule(TimeToTimestampCastRule.INSTANCE)
                 .addRule(NumericToTimestampCastRule.INSTANCE)
                 .addRule(TimestampToNumericCastRule.INSTANCE)
+                .addRule(TimeToTimeCastRule.INSTANCE)
                 // To binary rules
                 .addRule(BinaryToBinaryCastRule.INSTANCE)
                 .addRule(RawToBinaryCastRule.INSTANCE)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimeToTimeCastRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimeToTimeCastRule.java
new file mode 100644
index 00000000000..25d07ded6c7
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimeToTimeCastRule.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.planner.codegen.calls.BuiltInMethods;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/**
+ * {@link LogicalTypeRoot#TIME_WITHOUT_TIME_ZONE} to {@link 
LogicalTypeRoot#TIME_WITHOUT_TIME_ZONE}.
+ */
+class TimeToTimeCastRule extends 
AbstractExpressionCodeGeneratorCastRule<Number, Number> {
+
+    static final TimeToTimeCastRule INSTANCE = new TimeToTimeCastRule();
+
+    private TimeToTimeCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .input(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE)
+                        .target(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE)
+                        .build());
+    }
+
+    @Override
+    public String generateExpression(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final int inputPrecision = 
LogicalTypeChecks.getPrecision(inputLogicalType);
+        int targetPrecision = 
LogicalTypeChecks.getPrecision(targetLogicalType);
+
+        return inputPrecision <= targetPrecision
+                ? inputTerm
+                : staticCall(BuiltInMethods.TRUNCATE_SQL_TIME(), inputTerm, 
targetPrecision);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
index 9949e91cbaa..44de1c7b1dc 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
@@ -24,7 +24,6 @@ import org.apache.flink.table.legacy.api.TableSchema
 import org.apache.flink.table.legacy.types.logical.TypeInformationRawType
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType
 import org.apache.flink.table.planner.plan.schema._
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 import org.apache.flink.table.runtime.types.{LogicalTypeDataTypeConverter, 
PlannerTypeUtils}
 import org.apache.flink.table.types.logical._
@@ -87,7 +86,8 @@ class FlinkTypeFactory(
 
       // temporal types
       case LogicalTypeRoot.DATE => createSqlType(DATE)
-      case LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE => createSqlType(TIME)
+      case LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE =>
+        createSqlType(TIME, t.asInstanceOf[TimeType].getPrecision)
 
       // interval types
       case LogicalTypeRoot.INTERVAL_YEAR_MONTH =>
@@ -630,11 +630,7 @@ object FlinkTypeFactory {
       // temporal types
       case DATE => new DateType()
       case TIME =>
-        if (relDataType.getPrecision > 3) {
-          throw new TableException(s"TIME precision is not supported: 
${relDataType.getPrecision}")
-        }
-        // the planner supports precision 3, but for consistency with old 
planner, we set it to 0.
-        new TimeType()
+        new TimeType(relDataType.getPrecision)
       case TIMESTAMP =>
         new TimestampType(relDataType.getPrecision)
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index e434e5ee989..a6392ae4a4d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -453,6 +453,9 @@ object BuiltInMethods {
   val TRUNCATE_SQL_TIMESTAMP =
     Types.lookupMethod(classOf[DateTimeUtils], "truncate", 
classOf[TimestampData], classOf[Int])
 
+  val TRUNCATE_SQL_TIME =
+    Types.lookupMethod(classOf[DateTimeUtils], "truncate", classOf[Int], 
classOf[Int])
+
   val ADD_MONTHS =
     Types.lookupMethod(classOf[DateTimeUtils], "addMonths", classOf[Long], 
classOf[Int])
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 11b1cf199ab..208c0d8d92c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -181,14 +181,22 @@ object ScalarOperatorGens {
         }
 
       case (TIME_WITHOUT_TIME_ZONE, INTERVAL_DAY_TIME) =>
-        generateOperatorIfNotNull(ctx, new TimeType(), left, right) {
+        generateOperatorIfNotNull(
+          ctx,
+          new TimeType(LogicalTypeChecks.getPrecision(left.resultType)),
+          left,
+          right) {
           (l, r) =>
             s"java.lang.Math.toIntExact((($l + ${MILLIS_PER_DAY}L) $op (" +
               s"java.lang.Math.toIntExact($r % ${MILLIS_PER_DAY}L))) % 
${MILLIS_PER_DAY}L)"
         }
 
       case (TIME_WITHOUT_TIME_ZONE, INTERVAL_YEAR_MONTH) =>
-        generateOperatorIfNotNull(ctx, new TimeType(), left, right)((l, r) => 
s"$l")
+        generateOperatorIfNotNull(
+          ctx,
+          new TimeType(LogicalTypeChecks.getPrecision(left.resultType)),
+          left,
+          right)((l, r) => s"$l")
 
       case (TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE, 
INTERVAL_DAY_TIME) =>
         generateOperatorIfNotNull(ctx, left.resultType, left, right) {
@@ -609,9 +617,11 @@ object ScalarOperatorGens {
           }
       }
       // both sides are numeric
-      else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
-        (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
-      }
+      else if (
+        isNumeric(left.resultType) && isNumeric(right.resultType)
+        || isTime(left.resultType) &&
+        isTime(right.resultType)
+      ) { (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm" }
 
       // both sides are timestamp
       else if (isTimestamp(left.resultType) && isTimestamp(right.resultType)) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index a157ea591ec..ce6bbece424 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -318,7 +318,8 @@ class AggFunctionFactory(
         case DATE =>
           new MinAggFunction.DateMinAggFunction
         case TIME_WITHOUT_TIME_ZONE =>
-          new MinAggFunction.TimeMinAggFunction
+          val t = argTypes(0).asInstanceOf[TimeType]
+          new MinAggFunction.TimeMinAggFunction(t)
         case TIMESTAMP_WITHOUT_TIME_ZONE =>
           val d = argTypes(0).asInstanceOf[TimestampType]
           new MinAggFunction.TimestampMinAggFunction(d)
@@ -377,7 +378,8 @@ class AggFunctionFactory(
       case DATE =>
         new LeadLagAggFunction.DateLeadLagAggFunction(argTypes.length)
       case TIME_WITHOUT_TIME_ZONE =>
-        new LeadLagAggFunction.TimeLeadLagAggFunction(argTypes.length)
+        val t = argTypes(0).asInstanceOf[TimeType]
+        new LeadLagAggFunction.TimeLeadLagAggFunction(t, argTypes.length)
       case TIMESTAMP_WITHOUT_TIME_ZONE =>
         val d = argTypes(0).asInstanceOf[TimestampType]
         new LeadLagAggFunction.TimestampLeadLagAggFunction(argTypes.length, d)
@@ -427,7 +429,8 @@ class AggFunctionFactory(
         case DATE =>
           new MaxAggFunction.DateMaxAggFunction
         case TIME_WITHOUT_TIME_ZONE =>
-          new MaxAggFunction.TimeMaxAggFunction
+          val t = argTypes(0).asInstanceOf[TimeType]
+          new MaxAggFunction.TimeMaxAggFunction(t)
         case TIMESTAMP_WITHOUT_TIME_ZONE =>
           val d = argTypes(0).asInstanceOf[TimestampType]
           new MaxAggFunction.TimestampMaxAggFunction(d)
@@ -520,7 +523,8 @@ class AggFunctionFactory(
       case DATE =>
         new DateSingleValueAggFunction
       case TIME_WITHOUT_TIME_ZONE =>
-        new TimeSingleValueAggFunction
+        val t = argTypes(0).asInstanceOf[TimeType]
+        new TimeSingleValueAggFunction(t)
       case TIMESTAMP_WITHOUT_TIME_ZONE =>
         val d = argTypes(0).asInstanceOf[TimestampType]
         new TimestampSingleValueAggFunction(d)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
index a5b31c0dbac..462b25eab8a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
@@ -724,10 +724,10 @@ public class CastFunctionITCase extends 
BuiltInFunctionTestBase {
                         .failRuntime(STRING(), "123:45", 
DateTimeException.class)
                         .failRuntime(STRING(), "2021-09-27", 
DateTimeException.class)
                         .failRuntime(STRING(), "2021-09-27 12:34:56", 
DateTimeException.class)
-                        // https://issues.apache.org/jira/browse/FLINK-17224 
Fractional seconds are
-                        // lost
                         .fromCase(STRING(), "23", LocalTime.of(23, 0, 0, 0))
                         .fromCase(STRING(), "23:45", LocalTime.of(23, 45, 0, 
0))
+                        // https://issues.apache.org/jira/browse/FLINK-39214
+                        // Fractional seconds below milliseconds are lost
                         .fromCase(STRING(), "12:34:56.123456789", 
LocalTime.of(12, 34, 56, 0))
                         .failRuntime(
                                 STRING(), "2021-09-27 12:34:56.123456789", 
DateTimeException.class)
@@ -764,6 +764,67 @@ public class CastFunctionITCase extends 
BuiltInFunctionTestBase {
                         // ROW
                         // RAW
                         .build(),
+                CastTestSpecBuilder.testCastTo(TIME(3))
+                        .fromCase(TIME(), null, null)
+                        .failRuntime(CHAR(3), "foo", DateTimeException.class)
+                        .failRuntime(VARCHAR(5), "Flink", 
DateTimeException.class)
+                        .failRuntime(STRING(), "Flink", 
DateTimeException.class)
+                        .failRuntime(STRING(), "123", DateTimeException.class)
+                        .failRuntime(STRING(), "123:45", 
DateTimeException.class)
+                        .failRuntime(STRING(), "2021-09-27", 
DateTimeException.class)
+                        .failRuntime(STRING(), "2021-09-27 12:34:56", 
DateTimeException.class)
+                        .fromCase(STRING(), "23", LocalTime.of(23, 0, 0, 0))
+                        .fromCase(STRING(), "23:45", LocalTime.of(23, 45, 0, 
0))
+                        .fromCase(
+                                STRING(),
+                                "12:34:56.123456789",
+                                LocalTime.of(12, 34, 56, 123_000_000))
+                        .failRuntime(
+                                STRING(), "2021-09-27 12:34:56.123456789", 
DateTimeException.class)
+                        // Not supported - no fix
+                        .failValidation(BOOLEAN(), true)
+                        .failTableApiValidation(BINARY(2), DEFAULT_BINARY)
+                        .failTableApiValidation(VARBINARY(5), 
DEFAULT_VARBINARY)
+                        .failTableApiValidation(BYTES(), DEFAULT_BYTES)
+                        .failValidation(DECIMAL(5, 3), 12.345)
+                        .failValidation(TINYINT(), DEFAULT_POSITIVE_TINY_INT)
+                        .failValidation(SMALLINT(), DEFAULT_POSITIVE_SMALL_INT)
+                        .failValidation(INT(), DEFAULT_POSITIVE_INT)
+                        .failValidation(BIGINT(), DEFAULT_POSITIVE_BIGINT)
+                        .failValidation(FLOAT(), DEFAULT_POSITIVE_FLOAT)
+                        .failValidation(DOUBLE(), DEFAULT_POSITIVE_DOUBLE)
+                        .failValidation(DATE(), DEFAULT_DATE)
+                        .fromCase(TIME(5), DEFAULT_TIME, LocalTime.of(12, 34, 
56, 123_000_000))
+                        .fromCase(
+                                TIMESTAMP(),
+                                DEFAULT_TIMESTAMP,
+                                LocalTime.of(12, 34, 56, 123_000_000))
+                        .fromCase(
+                                TIMESTAMP(4),
+                                DEFAULT_TIMESTAMP,
+                                LocalTime.of(12, 34, 56, 123_000_000))
+
+                        // https://issues.apache.org/jira/browse/FLINK-20869
+                        // TIMESTAMP_WITH_TIME_ZONE
+
+                        // https://issues.apache.org/jira/browse/FLINK-24422 - 
Accept only Instant
+                        .fromCase(
+                                TIMESTAMP_LTZ(4),
+                                DEFAULT_TIMESTAMP,
+                                LocalTime.of(12, 34, 56, 123_000_000))
+                        .fromCase(
+                                TIMESTAMP_LTZ(4),
+                                DEFAULT_TIMESTAMP_LTZ,
+                                LocalTime.of(7, 54, 56, 123_000_000))
+                        // Not supported - no fix
+                        .failValidation(INTERVAL(YEAR(), MONTH()), 
DEFAULT_INTERVAL_YEAR)
+                        .failValidation(INTERVAL(DAY(), SECOND()), 
DEFAULT_INTERVAL_DAY)
+                        .failValidation(ARRAY(INT()), DEFAULT_ARRAY)
+                        // MULTISET
+                        // MAP
+                        // ROW
+                        // RAW
+                        .build(),
                 CastTestSpecBuilder.testCastTo(TIMESTAMP(9))
                         .fromCase(TIMESTAMP(), null, null)
                         .failRuntime(CHAR(3), "foo", DateTimeException.class)
@@ -790,12 +851,14 @@ public class CastFunctionITCase extends 
BuiltInFunctionTestBase {
                         //
                         .fromCase(DATE(), DEFAULT_DATE, LocalDateTime.of(2021, 
9, 24, 0, 0, 0, 0))
 
-                        // https://issues.apache.org/jira/browse/FLINK-17224 
Fractional seconds are
-                        // lost
+                        // https://issues.apache.org/jira/browse/FLINK-39214
+                        // Fractional seconds below milliseconds are lost
                         // https://issues.apache.org/jira/browse/FLINK-24423 
Continue using EPOCH
                         // date or use 0 for the year?
                         .fromCase(
-                                TIME(5), DEFAULT_TIME, LocalDateTime.of(1970, 
1, 1, 12, 34, 56, 0))
+                                TIME(5),
+                                DEFAULT_TIME,
+                                LocalDateTime.of(1970, 1, 1, 12, 34, 56, 
123_000_000))
                         .fromCase(
                                 TIMESTAMP(),
                                 DEFAULT_TIMESTAMP,
@@ -868,14 +931,13 @@ public class CastFunctionITCase extends 
BuiltInFunctionTestBase {
                                 DEFAULT_DATE,
                                 fromLocalToUTC(LocalDateTime.of(2021, 9, 24, 
0, 0, 0, 0)))
 
-                        // https://issues.apache.org/jira/browse/FLINK-17224 
Fractional seconds are
-                        // lost
                         // https://issues.apache.org/jira/browse/FLINK-24423 
Continue using EPOCH
                         // date or use 0 for the year?
                         .fromCase(
                                 TIME(5),
                                 DEFAULT_TIME,
-                                fromLocalToUTC(LocalDateTime.of(1970, 1, 1, 
12, 34, 56, 0)))
+                                fromLocalToUTC(
+                                        LocalDateTime.of(1970, 1, 1, 12, 34, 
56, 123_000_000)))
                         .fromCase(
                                 TIMESTAMP(),
                                 DEFAULT_TIMESTAMP,
@@ -1039,9 +1101,7 @@ public class CastFunctionITCase extends 
BuiltInFunctionTestBase {
                                 DEFAULT_NEGATIVE_DOUBLE,
                                 String.valueOf(DEFAULT_NEGATIVE_DOUBLE))
                         .fromCase(DATE(), DEFAULT_DATE, "2021-09-24")
-                        // https://issues.apache.org/jira/browse/FLINK-17224 
Currently, fractional
-                        // seconds are lost
-                        .fromCase(TIME(5), DEFAULT_TIME, "12:34:56")
+                        .fromCase(TIME(5), DEFAULT_TIME, "12:34:56.123")
                         .fromCase(TIMESTAMP(), DEFAULT_TIMESTAMP, "2021-09-24 
12:34:56.123456")
                         .fromCase(TIMESTAMP(9), DEFAULT_TIMESTAMP, "2021-09-24 
12:34:56.123456700")
                         .fromCase(TIMESTAMP(4), DEFAULT_TIMESTAMP, "2021-09-24 
12:34:56.1234")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
index fc500d9a518..fe4b766ffd0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
@@ -355,11 +355,11 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase 
{
         return Stream.of(
                 
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS)
                         .onFieldsWithData(
-                                LocalTime.of(2, 55, 0),
+                                LocalTime.of(2, 55, 0, 123_000_000),
                                 Duration.ofHours(1),
                                 LocalTime.of(3, 30, 0),
                                 Duration.ofHours(2))
-                        .andDataTypes(TIME(), INTERVAL(HOUR()), TIME(), 
INTERVAL(HOUR()))
+                        .andDataTypes(TIME(3), INTERVAL(HOUR()), TIME(3), 
INTERVAL(HOUR()))
                         .testResult(
                                 temporalOverlaps($("f0"), $("f1"), $("f2"), 
$("f3")),
                                 "(f0, f1) OVERLAPS (f2, f3)",
@@ -450,18 +450,16 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase 
{
         return Stream.of(
                 TestSetSpec.forFunction(BuiltInFunctionDefinitions.FLOOR)
                         .onFieldsWithData(
-                                // 
https://issues.apache.org/jira/browse/FLINK-17224
-                                // Fractional seconds are lost
-                                LocalTime.of(11, 22, 33),
+                                LocalTime.of(11, 22, 33, 123_456_789),
                                 LocalDate.of(1990, 10, 14),
                                 LocalDateTime.of(2020, 2, 29, 1, 56, 59, 
987654321),
                                 LocalDateTime.of(2021, 9, 24, 9, 20, 50, 
924325471))
-                        .andDataTypes(TIME(), DATE(), TIMESTAMP(), TIMESTAMP())
+                        .andDataTypes(TIME(3), DATE(), TIMESTAMP(), 
TIMESTAMP())
                         .testResult(
                                 $("f0").ceil(TimeIntervalUnit.MILLISECOND),
                                 "CEIL(f0 TO MILLISECOND)",
-                                LocalTime.of(11, 22, 33),
-                                TIME().nullable())
+                                LocalTime.of(11, 22, 33, 123_000_000),
+                                TIME(3).nullable())
                         .testResult(
                                 $("f1").ceil(TimeIntervalUnit.MILLISECOND),
                                 "CEIL(f1 TO MILLISECOND)",
@@ -475,8 +473,8 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
                         .testResult(
                                 $("f0").ceil(TimeIntervalUnit.SECOND),
                                 "CEIL(f0 TO SECOND)",
-                                LocalTime.of(11, 22, 33),
-                                TIME().nullable())
+                                LocalTime.of(11, 22, 34),
+                                TIME(3).nullable())
                         .testResult(
                                 $("f1").ceil(TimeIntervalUnit.SECOND),
                                 "CEIL(f1 TO SECOND)",
@@ -491,7 +489,7 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
                                 $("f0").ceil(TimeIntervalUnit.MINUTE),
                                 "CEIL(f0 TO MINUTE)",
                                 LocalTime.of(11, 23),
-                                TIME().nullable())
+                                TIME(3).nullable())
                         .testResult(
                                 $("f1").ceil(TimeIntervalUnit.MINUTE),
                                 "CEIL(f1 TO MINUTE)",
@@ -506,7 +504,7 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
                                 $("f0").ceil(TimeIntervalUnit.HOUR),
                                 "CEIL(f0 TO HOUR)",
                                 LocalTime.of(12, 0),
-                                TIME().nullable())
+                                TIME(3).nullable())
                         .testResult(
                                 $("f1").ceil(TimeIntervalUnit.HOUR),
                                 "CEIL(f1 TO HOUR)",
@@ -635,17 +633,15 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase 
{
         return Stream.of(
                 TestSetSpec.forFunction(BuiltInFunctionDefinitions.FLOOR)
                         .onFieldsWithData(
-                                // 
https://issues.apache.org/jira/browse/FLINK-17224
-                                // Fractional seconds are lost
-                                LocalTime.of(11, 22, 33),
+                                LocalTime.of(11, 22, 33, 123_456_789),
                                 LocalDate.of(1990, 10, 14),
                                 LocalDateTime.of(2020, 2, 29, 1, 56, 59, 
987654321))
-                        .andDataTypes(TIME(), DATE(), TIMESTAMP())
+                        .andDataTypes(TIME(3), DATE(), TIMESTAMP())
                         .testResult(
                                 $("f0").floor(TimeIntervalUnit.MILLISECOND),
                                 "FLOOR(f0 TO MILLISECOND)",
-                                LocalTime.of(11, 22, 33),
-                                TIME().nullable())
+                                LocalTime.of(11, 22, 33, 123_000_000),
+                                TIME(3).nullable())
                         .testResult(
                                 $("f1").floor(TimeIntervalUnit.MILLISECOND),
                                 "FLOOR(f1 TO MILLISECOND)",
@@ -660,7 +656,7 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
                                 $("f0").floor(TimeIntervalUnit.SECOND),
                                 "FLOOR(f0 TO SECOND)",
                                 LocalTime.of(11, 22, 33),
-                                TIME().nullable())
+                                TIME(3).nullable())
                         .testResult(
                                 $("f1").floor(TimeIntervalUnit.SECOND),
                                 "FLOOR(f1 TO SECOND)",
@@ -675,7 +671,7 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
                                 $("f0").floor(TimeIntervalUnit.MINUTE),
                                 "FLOOR(f0 TO MINUTE)",
                                 LocalTime.of(11, 22),
-                                TIME().nullable())
+                                TIME(3).nullable())
                         .testResult(
                                 $("f1").floor(TimeIntervalUnit.MINUTE),
                                 "FLOOR(f1 TO MINUTE)",
@@ -690,7 +686,7 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
                                 $("f0").floor(TimeIntervalUnit.HOUR),
                                 "FLOOR(f0 TO HOUR)",
                                 LocalTime.of(11, 0),
-                                TIME().nullable())
+                                TIME(3).nullable())
                         .testResult(
                                 $("f1").floor(TimeIntervalUnit.HOUR),
                                 "FLOOR(f1 TO HOUR)",
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index 768736d4c63..82b0d7b0af7 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -475,8 +475,8 @@ class CastRulesTest {
                                 STRING(),
                                 fromString("2021-09-27 12:34:56"),
                                 TableRuntimeException.class)
-                        // https://issues.apache.org/jira/browse/FLINK-17224 
Currently, fractional
-                        // seconds are lost
+                        // https://issues.apache.org/jira/browse/FLINK-39214
+                        // Fractional seconds below milliseconds are lost
                         .fromCase(
                                 STRING(),
                                 fromString("12:34:56.123456789"),
@@ -536,8 +536,8 @@ class CastRulesTest {
                                 DATE(),
                                 DateTimeUtils.toInternal(LocalDate.of(2022, 1, 
4)),
                                 timestampDataFromLocalDateTime(2022, 1, 4, 0, 
0, 0, 0))
-                        // https://issues.apache.org/jira/browse/FLINK-17224 
Currently, fractional
-                        // seconds are lost
+                        // https://issues.apache.org/jira/browse/FLINK-39214
+                        // Fractional seconds below milliseconds are lost
                         .fromCase(
                                 TIME(5),
                                 TIME,
@@ -613,8 +613,8 @@ class CastRulesTest {
                                 DATE(),
                                 DateTimeUtils.toInternal(LocalDate.of(2022, 1, 
4)),
                                 timestampDataFromInstant(2022, 1, 4, 1, 0, 0, 
0))
-                        // https://issues.apache.org/jira/browse/FLINK-17224 
Currently, fractional
-                        // seconds are lost
+                        // https://issues.apache.org/jira/browse/FLINK-39214
+                        // Fractional seconds below milliseconds are lost
                         .fromCase(
                                 TIME(5),
                                 TIME,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 12496179523..66b338aebb6 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.catalog.TableDistribution.Kind;
@@ -104,6 +105,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.flink.table.api.Expressions.$;
+import static 
org.apache.flink.table.planner.operations.SqlDdlToOperationConverterTest.TestItem.createTestItem;
 import static org.apache.flink.table.planner.utils.OperationMatchers.entry;
 import static 
org.apache.flink.table.planner.utils.OperationMatchers.isCreateTableOperation;
 import static 
org.apache.flink.table.planner.utils.OperationMatchers.partitionedBy;
@@ -807,10 +809,10 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                                 + " ROW<`tmstmp` TIMESTAMP(3)>.");
     }
 
-    @Test // TODO: tweak the tests when FLINK-13604 is fixed.
+    @Test
     void testCreateTableWithFullDataTypes() {
         final List<TestItem> testItems =
-                Arrays.asList(
+                List.of(
                         createTestItem("CHAR", DataTypes.CHAR(1)),
                         createTestItem("CHAR NOT NULL", 
DataTypes.CHAR(1).notNull()),
                         createTestItem("CHAR NULL", DataTypes.CHAR(1)),
@@ -844,10 +846,8 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                         createTestItem("DATE", DataTypes.DATE()),
                         createTestItem("TIME", DataTypes.TIME()),
                         createTestItem("TIME WITHOUT TIME ZONE", 
DataTypes.TIME()),
-                        // Expect to be TIME(3).
-                        createTestItem("TIME(3)", DataTypes.TIME()),
-                        // Expect to be TIME(3).
-                        createTestItem("TIME(3) WITHOUT TIME ZONE", 
DataTypes.TIME()),
+                        createTestItem("TIME(3)", DataTypes.TIME(3)),
+                        createTestItem("TIME(3) WITHOUT TIME ZONE", 
DataTypes.TIME(3)),
                         createTestItem("TIMESTAMP", DataTypes.TIMESTAMP(6)),
                         createTestItem("TIMESTAMP WITHOUT TIME ZONE", 
DataTypes.TIMESTAMP(6)),
                         createTestItem("TIMESTAMP(3)", DataTypes.TIMESTAMP(3)),
@@ -951,11 +951,15 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
         SqlNode node = parser.parse(sql);
         assertThat(node).isInstanceOf(SqlCreateTable.class);
-        Operation operation =
-                SqlNodeToOperationConversion.convert(planner, catalogManager, 
node).get();
-        TableSchema schema = ((CreateTableOperation) 
operation).getCatalogTable().getSchema();
-        Object[] expectedDataTypes = testItems.stream().map(item -> 
item.expectedType).toArray();
-        assertThat(schema.getFieldDataTypes()).isEqualTo(expectedDataTypes);
+        final Optional<Operation> convert =
+                SqlNodeToOperationConversion.convert(planner, catalogManager, 
node);
+        assertThat(convert).isPresent();
+        Operation operation = convert.get();
+        ResolvedSchema schema =
+                ((CreateTableOperation) 
operation).getCatalogTable().getResolvedSchema();
+        List<DataType> expectedDataTypes =
+                testItems.stream().map(item -> 
item.expectedType).collect(Collectors.toList());
+        assertThat(schema.getColumnDataTypes()).isEqualTo(expectedDataTypes);
     }
 
     @Test
@@ -2716,18 +2720,6 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
 
     // ~ Tool Methods 
----------------------------------------------------------
 
-    private static TestItem createTestItem(Object... args) {
-        assertThat(args).hasSize(2);
-        final String testExpr = (String) args[0];
-        TestItem testItem = TestItem.fromTestExpr(testExpr);
-        if (args[1] instanceof String) {
-            testItem.withExpectedError((String) args[1]);
-        } else {
-            testItem.withExpectedType(args[1]);
-        }
-        return testItem;
-    }
-
     private CatalogTable prepareTable(boolean hasConstraint) throws Exception {
         return prepareTable("tb1", hasConstraint ? 1 : 0);
     }
@@ -2952,27 +2944,34 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
 
     // ~ Inner Classes 
----------------------------------------------------------
 
-    private static class TestItem {
+    static class TestItem {
         private final String testExpr;
-        @Nullable private Object expectedType;
-        @Nullable private String expectedError;
+        @Nullable private final DataType expectedType;
+        @Nullable private final String expectedError;
 
-        private TestItem(String testExpr) {
+        private TestItem(
+                final String testExpr,
+                @Nullable final DataType expectedType,
+                @Nullable final String expectedError) {
             this.testExpr = testExpr;
+            this.expectedType = expectedType;
+            this.expectedError = expectedError;
         }
 
-        static TestItem fromTestExpr(String testExpr) {
-            return new TestItem(testExpr);
+        private TestItem(final String testExpr, @Nullable final DataType 
expectedType) {
+            this(testExpr, expectedType, null);
         }
 
-        TestItem withExpectedType(Object expectedType) {
-            this.expectedType = expectedType;
-            return this;
+        private TestItem(final String testExpr, @Nullable final String 
expectedError) {
+            this(testExpr, null, expectedError);
         }
 
-        TestItem withExpectedError(String expectedError) {
-            this.expectedError = expectedError;
-            return this;
+        static TestItem createTestItem(String testExpr, DataType dataType) {
+            return new TestItem(testExpr, dataType);
+        }
+
+        static TestItem createTestItem(String testExpr, String expectedError) {
+            return new TestItem(testExpr, expectedError);
         }
 
         @Override
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MLPredictTableFunctionTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MLPredictTableFunctionTest.xml
index 8e5c5c6696b..3b0991d8c2d 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MLPredictTableFunctionTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MLPredictTableFunctionTest.xml
@@ -104,14 +104,14 @@ FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, 
DESCRIPTOR(col)))]]>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(col=[$0], res=[$1])
-+- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
    +- LogicalProject(col=[$0])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
 +- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
 ]]>
     </Resource>
@@ -384,14 +384,14 @@ FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, 
DESCRIPTOR(col)))]]>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(col=[$0], res=[$1])
-+- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
    +- LogicalProject(col=[$0])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
 +- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.xml
index b4f8876ce50..b1d51479529 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.xml
@@ -104,14 +104,14 @@ FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, 
DESCRIPTOR(col)))]]>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(col=[$0], res=[$1])
-+- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
    +- LogicalProject(col=[$0])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
 +- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
 ]]>
     </Resource>
@@ -384,14 +384,14 @@ FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel, 
DESCRIPTOR(col)))]]>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(col=[$0], res=[$1])
-+- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
    +- LogicalProject(col=[$0])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
TypeTable]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL 
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'), 
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
 +- TableSourceScan(table=[[default_catalog, default_database, TypeTable]], 
fields=[col])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ValuesTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ValuesTest.xml
index 477ffef8d0c..3ca767eb559 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ValuesTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ValuesTest.xml
@@ -244,9 +244,9 @@ Union(all=[true], union=[a, b])
     <Resource name="ast">
       <![CDATA[
 LogicalUnion(all=[true])
-:- LogicalProject(number=[CAST(1:DOUBLE):DOUBLE], row=[ROW(_UTF-16LE'A    ', 
2:DECIMAL(10, 2), ROW(00:00:00))], array=[ARRAY(1:BIGINT)])
+:- LogicalProject(number=[CAST(1:DOUBLE):DOUBLE], row=[ROW(_UTF-16LE'A    ', 
2:DECIMAL(10, 2), ROW(00:00:00:TIME(3)))], array=[ARRAY(1:BIGINT)])
 :  +- LogicalValues(tuples=[[{ 0 }]])
-+- LogicalProject(number=[CAST(3.141592653589793E0:DOUBLE):DOUBLE], 
row=[ROW(_UTF-16LE'ABC  ', 3.0E0:DECIMAL(10, 2), ROW(00:00:00))], 
array=[ARRAY(3:BIGINT)])
++- LogicalProject(number=[CAST(3.141592653589793E0:DOUBLE):DOUBLE], 
row=[ROW(_UTF-16LE'ABC  ', 3.0E0:DECIMAL(10, 2), ROW(00:00:00.1:TIME(3)))], 
array=[ARRAY(3:BIGINT)])
    +- LogicalValues(tuples=[[{ 0 }]])
 ]]>
     </Resource>
@@ -255,7 +255,7 @@ LogicalUnion(all=[true])
 Union(all=[true], union=[number, row, array])
 :- Calc(select=[CAST(1 AS DOUBLE) AS number, ROW('A    ', 2, ROW(00:00:00)) AS 
row, ARRAY(1) AS array])
 :  +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
-+- Calc(select=[CAST(3.141592653589793E0 AS DOUBLE) AS number, ROW('ABC  ', 
3.0E0, ROW(00:00:00)) AS row, ARRAY(3) AS array])
++- Calc(select=[CAST(3.141592653589793E0 AS DOUBLE) AS number, ROW('ABC  ', 
3.0E0, ROW(00:00:00.1)) AS row, ARRAY(3) AS array])
    +- Reused(reference_id=[1])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index 351a2d4095c..60e93f1004c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -610,7 +610,7 @@ class TemporalTypesTest extends ExpressionTestBase {
   @Test
   def testDateAndTime(): Unit = {
     testSqlApi("DATE '2018-03-14'", "2018-03-14")
-    testSqlApi("TIME '19:01:02.123'", "19:01:02")
+    testSqlApi("TIME '19:01:02.123'", "19:01:02.123")
 
     // DATE & TIME
     testSqlApi("CAST('12:44:31' AS TIME)", "12:44:31")
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java
index 591e1899b9d..c203b283dc7 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java
@@ -37,6 +37,7 @@ import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.ROW;
 import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.STRUCTURED_TYPE;
 import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
 import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARIANT;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
 
@@ -196,4 +197,8 @@ public class TypeCheckUtils {
                 return true;
         }
     }
+
+    public static boolean isTime(LogicalType type) {
+        return type.getTypeRoot() == TIME_WITHOUT_TIME_ZONE;
+    }
 }

Reply via email to