This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e4d4996551c [FLINK-17224][table] Support precision of `TIME` type
e4d4996551c is described below
commit e4d4996551ce3b097a6c15d7d281a17e3d01be56
Author: Ramin Gharib <[email protected]>
AuthorDate: Mon Sep 1 11:59:20 2025 +0200
[FLINK-17224][table] Support precision of `TIME` type
Co-Authored-By: Sergey Nuyanzin <[email protected]>
This closes #26954.
---
.../flink/table/runtime/batch/AvroTypesITCase.java | 18 ++---
.../json/JsonParserToRowDataConverters.java | 2 +-
.../formats/json/JsonToRowDataConverters.java | 2 +-
.../formats/json/RowDataToJsonConverters.java | 2 +-
.../formats/json/JsonRowDataSerDeSchemaTest.java | 18 +++--
.../apache/flink/table/utils/DateTimeUtils.java | 4 ++
.../functions/aggfunctions/LeadLagAggFunction.java | 7 +-
.../functions/aggfunctions/MaxAggFunction.java | 8 ++-
.../functions/aggfunctions/MinAggFunction.java | 8 ++-
.../aggfunctions/SingleValueAggFunction.java | 8 ++-
.../functions/casting/CastRuleProvider.java | 1 +
.../functions/casting/TimeToTimeCastRule.java | 56 +++++++++++++++
.../table/planner/calcite/FlinkTypeFactory.scala | 10 +--
.../planner/codegen/calls/BuiltInMethods.scala | 3 +
.../planner/codegen/calls/ScalarOperatorGens.scala | 20 ++++--
.../planner/plan/utils/AggFunctionFactory.scala | 12 ++--
.../planner/functions/CastFunctionITCase.java | 82 +++++++++++++++++++---
.../planner/functions/TimeFunctionsITCase.java | 38 +++++-----
.../planner/functions/casting/CastRulesTest.java | 12 ++--
.../operations/SqlDdlToOperationConverterTest.java | 69 +++++++++---------
.../plan/batch/sql/MLPredictTableFunctionTest.xml | 8 +--
.../plan/stream/sql/MLPredictTableFunctionTest.xml | 8 +--
.../table/planner/plan/stream/table/ValuesTest.xml | 6 +-
.../planner/expressions/TemporalTypesTest.scala | 2 +-
.../table/runtime/typeutils/TypeCheckUtils.java | 5 ++
25 files changed, 284 insertions(+), 125 deletions(-)
diff --git
a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
index 53853a10ba1..3c97e226980 100644
---
a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
+++
b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -27,12 +27,12 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.test.util.AbstractTestBaseJUnit4;
+import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.CollectionUtil;
import org.apache.avro.util.Utf8;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
@@ -51,7 +51,7 @@ import static org.apache.flink.table.api.Expressions.$;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for interoperability with Avro types. */
-public class AvroTypesITCase extends AbstractTestBaseJUnit4 {
+class AvroTypesITCase extends AbstractTestBase {
private static final User USER_1 =
User.newBuilder()
@@ -79,7 +79,7 @@ public class AvroTypesITCase extends AbstractTestBaseJUnit4 {
.build())
.setTypeBytes(ByteBuffer.allocate(10))
.setTypeDate(LocalDate.parse("2014-03-01"))
- .setTypeTimeMillis(LocalTime.parse("12:12:12"))
+ .setTypeTimeMillis(LocalTime.parse("12:12:12.345"))
.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS))
.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
.setTypeTimestampMicros(
@@ -152,7 +152,7 @@ public class AvroTypesITCase extends AbstractTestBaseJUnit4
{
.build();
@Test
- public void testAvroToRow() throws Exception {
+ void testAvroToRow() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -167,7 +167,7 @@ public class AvroTypesITCase extends AbstractTestBaseJUnit4
{
String expected =
"+I[Charlie, null, blue, 1337, 1.337, null, false, [], [],
null, RED, {}, null, null, "
+ "{\"num\": 42, \"street\": \"Bakerstreet\",
\"city\": \"Berlin\", \"state\": \"Berlin\", \"zip\": \"12049\"}, "
- + "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],
2014-03-01, 12:12:12, 00:00:00.123456, 2014-03-01T12:12:12.321Z, "
+ + "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],
2014-03-01, 12:12:12.345, 00:00:00.123456, 2014-03-01T12:12:12.321Z, "
+ "1970-01-01T00:00:00.123456Z,
java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]\n"
+ "+I[Whatever, null, black, 42, 0.0, null, true,
[hello], [true], null, GREEN, {}, "
+ "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
null, null, "
@@ -181,7 +181,7 @@ public class AvroTypesITCase extends AbstractTestBaseJUnit4
{
}
@Test
- public void testAvroStringAccess() {
+ void testAvroStringAccess() {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -198,7 +198,7 @@ public class AvroTypesITCase extends AbstractTestBaseJUnit4
{
}
@Test
- public void testAvroObjectAccess() throws Exception {
+ void testAvroObjectAccess() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -217,7 +217,7 @@ public class AvroTypesITCase extends AbstractTestBaseJUnit4
{
}
@Test
- public void testAvroToAvro() throws Exception {
+ void testAvroToAvro() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
index 6556502f0d4..ea5ced4c905 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonParserToRowDataConverters.java
@@ -266,7 +266,7 @@ public class JsonParserToRowDataConverters implements
Serializable {
LocalTime localTime = parsedTime.query(TemporalQueries.localTime());
// get number of milliseconds of the day
- return localTime.toSecondOfDay() * 1000;
+ return (int) (localTime.toNanoOfDay() / 1000_000);
}
private TimestampData convertToTimestamp(JsonParser jp) throws IOException
{
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
index 5da937e43c7..653b8b3621b 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
@@ -214,7 +214,7 @@ public class JsonToRowDataConverters implements
Serializable {
LocalTime localTime = parsedTime.query(TemporalQueries.localTime());
// get number of milliseconds of the day
- return localTime.toSecondOfDay() * 1000;
+ return (int) (localTime.toNanoOfDay() / 1000_000);
}
private TimestampData convertToTimestamp(JsonNode jsonNode) {
diff --git
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
index 2c64c89cd8d..11ab01a35e8 100644
---
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
+++
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
@@ -177,7 +177,7 @@ public class RowDataToJsonConverters implements
Serializable {
private RowDataToJsonConverter createTimeConverter() {
return (mapper, reuse, value) -> {
int millisecond = (int) value;
- LocalTime time = LocalTime.ofSecondOfDay(millisecond / 1000L);
+ LocalTime time = LocalTime.ofNanoOfDay(millisecond * 1000_000L);
return
mapper.getNodeFactory().textNode(SQL_TIME_FORMAT.format(time));
};
}
diff --git
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index 94f59dbba36..0012ede7344 100644
---
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -118,6 +118,7 @@ public class JsonRowDataSerDeSchemaTest {
Double[] doubles = new Double[] {1.1, 2.2, 3.3};
LocalDate date = LocalDate.parse("1990-10-14");
LocalTime time = LocalTime.parse("12:12:43");
+ LocalTime time3 = LocalTime.parse("12:12:43.123");
Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123");
Timestamp timestamp9 = Timestamp.valueOf("1990-10-14
12:12:43.123456789");
Instant timestampWithLocalZone =
@@ -152,6 +153,7 @@ public class JsonRowDataSerDeSchemaTest {
root.set("doubles", doubleNode);
root.put("date", "1990-10-14");
root.put("time", "12:12:43");
+ root.put("time3", "12:12:43.123");
root.put("timestamp3", "1990-10-14T12:12:43.123");
root.put("timestamp9", "1990-10-14T12:12:43.123456789");
root.put("timestampWithLocalZone", "1990-10-14T12:12:43.123456789Z");
@@ -175,6 +177,7 @@ public class JsonRowDataSerDeSchemaTest {
FIELD("doubles", ARRAY(DOUBLE())),
FIELD("date", DATE()),
FIELD("time", TIME(0)),
+ FIELD("time3", TIME(3)),
FIELD("timestamp3", TIMESTAMP(3)),
FIELD("timestamp9", TIMESTAMP(9)),
FIELD("timestampWithLocalZone",
TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)),
@@ -188,7 +191,7 @@ public class JsonRowDataSerDeSchemaTest {
isJsonParser, schema, false, false,
TimestampFormat.ISO_8601);
open(deserializationSchema);
- Row expected = new Row(18);
+ Row expected = new Row(19);
expected.setField(0, true);
expected.setField(1, tinyint);
expected.setField(2, smallint);
@@ -201,12 +204,13 @@ public class JsonRowDataSerDeSchemaTest {
expected.setField(9, doubles);
expected.setField(10, date);
expected.setField(11, time);
- expected.setField(12, timestamp3.toLocalDateTime());
- expected.setField(13, timestamp9.toLocalDateTime());
- expected.setField(14, timestampWithLocalZone);
- expected.setField(15, map);
- expected.setField(16, multiSet);
- expected.setField(17, nestedMap);
+ expected.setField(12, time3);
+ expected.setField(13, timestamp3.toLocalDateTime());
+ expected.setField(14, timestamp9.toLocalDateTime());
+ expected.setField(15, timestampWithLocalZone);
+ expected.setField(16, map);
+ expected.setField(17, multiSet);
+ expected.setField(18, nestedMap);
RowData rowData = deserializationSchema.deserialize(serializedJson);
Row actual = convertToExternal(rowData, dataType);
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
index aaf2b075266..5f9eb01f57a 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java
@@ -1698,6 +1698,10 @@ public class DateTimeUtils {
}
}
+ public static int truncate(int time, int precision) {
+ return (int) zeroLastDigits(time, 3 - precision);
+ }
+
private static long zeroLastDigits(long l, int n) {
long tenToTheN = (long) Math.pow(10, n);
return (l / tenToTheN) * tenToTheN;
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LeadLagAggFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LeadLagAggFunction.java
index 9d4ffde91ca..aefddea9ba2 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LeadLagAggFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LeadLagAggFunction.java
@@ -264,13 +264,16 @@ public abstract class LeadLagAggFunction extends
DeclarativeAggregateFunction {
/** TimeLeadLagAggFunction. */
public static class TimeLeadLagAggFunction extends LeadLagAggFunction {
- public TimeLeadLagAggFunction(int operandCount) {
+ private final TimeType type;
+
+ public TimeLeadLagAggFunction(TimeType type, int operandCount) {
super(operandCount);
+ this.type = type;
}
@Override
public DataType getResultType() {
- return DataTypes.TIME(TimeType.DEFAULT_PRECISION);
+ return DataTypes.TIME(type.getPrecision());
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxAggFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxAggFunction.java
index a2de172610f..5d09664442d 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxAggFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxAggFunction.java
@@ -191,9 +191,15 @@ public abstract class MaxAggFunction extends
DeclarativeAggregateFunction {
/** Built-in Time Max aggregate function. */
public static class TimeMaxAggFunction extends MaxAggFunction {
+ private final TimeType type;
+
+ public TimeMaxAggFunction(TimeType type) {
+ this.type = type;
+ }
+
@Override
public DataType getResultType() {
- return DataTypes.TIME(TimeType.DEFAULT_PRECISION);
+ return DataTypes.TIME(type.getPrecision());
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MinAggFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MinAggFunction.java
index 19ee48d233f..80a6090a819 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MinAggFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MinAggFunction.java
@@ -185,9 +185,15 @@ public abstract class MinAggFunction extends
DeclarativeAggregateFunction {
/** Built-in Time Min aggregate function. */
public static class TimeMinAggFunction extends MinAggFunction {
+ private final TimeType type;
+
+ public TimeMinAggFunction(TimeType type) {
+ this.type = type;
+ }
+
@Override
public DataType getResultType() {
- return DataTypes.TIME(TimeType.DEFAULT_PRECISION);
+ return DataTypes.TIME(type.getPrecision());
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
index b6b983bd12b..d1cd7b0e20b 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SingleValueAggFunction.java
@@ -272,9 +272,15 @@ public abstract class SingleValueAggFunction extends
DeclarativeAggregateFunctio
private static final long serialVersionUID = 320495723666949978L;
+ private final TimeType type;
+
+ public TimeSingleValueAggFunction(TimeType type) {
+ this.type = type;
+ }
+
@Override
public DataType getResultType() {
- return DataTypes.TIME(TimeType.DEFAULT_PRECISION);
+ return DataTypes.TIME(type.getPrecision());
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
index b1a050cd744..7b94816696e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/CastRuleProvider.java
@@ -87,6 +87,7 @@ public class CastRuleProvider {
.addRule(TimeToTimestampCastRule.INSTANCE)
.addRule(NumericToTimestampCastRule.INSTANCE)
.addRule(TimestampToNumericCastRule.INSTANCE)
+ .addRule(TimeToTimeCastRule.INSTANCE)
// To binary rules
.addRule(BinaryToBinaryCastRule.INSTANCE)
.addRule(RawToBinaryCastRule.INSTANCE)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimeToTimeCastRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimeToTimeCastRule.java
new file mode 100644
index 00000000000..25d07ded6c7
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/TimeToTimeCastRule.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.planner.codegen.calls.BuiltInMethods;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import static
org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+
+/**
+ * {@link LogicalTypeRoot#TIME_WITHOUT_TIME_ZONE} to {@link
LogicalTypeRoot#TIME_WITHOUT_TIME_ZONE}.
+ */
+class TimeToTimeCastRule extends
AbstractExpressionCodeGeneratorCastRule<Number, Number> {
+
+ static final TimeToTimeCastRule INSTANCE = new TimeToTimeCastRule();
+
+ private TimeToTimeCastRule() {
+ super(
+ CastRulePredicate.builder()
+ .input(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE)
+ .target(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE)
+ .build());
+ }
+
+ @Override
+ public String generateExpression(
+ CodeGeneratorCastRule.Context context,
+ String inputTerm,
+ LogicalType inputLogicalType,
+ LogicalType targetLogicalType) {
+ final int inputPrecision =
LogicalTypeChecks.getPrecision(inputLogicalType);
+ int targetPrecision =
LogicalTypeChecks.getPrecision(targetLogicalType);
+
+ return inputPrecision <= targetPrecision
+ ? inputTerm
+ : staticCall(BuiltInMethods.TRUNCATE_SQL_TIME(), inputTerm,
targetPrecision);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
index 9949e91cbaa..44de1c7b1dc 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
@@ -24,7 +24,6 @@ import org.apache.flink.table.legacy.api.TableSchema
import org.apache.flink.table.legacy.types.logical.TypeInformationRawType
import org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType
import org.apache.flink.table.planner.plan.schema._
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
import org.apache.flink.table.runtime.types.{LogicalTypeDataTypeConverter,
PlannerTypeUtils}
import org.apache.flink.table.types.logical._
@@ -87,7 +86,8 @@ class FlinkTypeFactory(
// temporal types
case LogicalTypeRoot.DATE => createSqlType(DATE)
- case LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE => createSqlType(TIME)
+ case LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE =>
+ createSqlType(TIME, t.asInstanceOf[TimeType].getPrecision)
// interval types
case LogicalTypeRoot.INTERVAL_YEAR_MONTH =>
@@ -630,11 +630,7 @@ object FlinkTypeFactory {
// temporal types
case DATE => new DateType()
case TIME =>
- if (relDataType.getPrecision > 3) {
- throw new TableException(s"TIME precision is not supported:
${relDataType.getPrecision}")
- }
- // the planner supports precision 3, but for consistency with old
planner, we set it to 0.
- new TimeType()
+ new TimeType(relDataType.getPrecision)
case TIMESTAMP =>
new TimestampType(relDataType.getPrecision)
case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index e434e5ee989..a6392ae4a4d 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -453,6 +453,9 @@ object BuiltInMethods {
val TRUNCATE_SQL_TIMESTAMP =
Types.lookupMethod(classOf[DateTimeUtils], "truncate",
classOf[TimestampData], classOf[Int])
+ val TRUNCATE_SQL_TIME =
+ Types.lookupMethod(classOf[DateTimeUtils], "truncate", classOf[Int],
classOf[Int])
+
val ADD_MONTHS =
Types.lookupMethod(classOf[DateTimeUtils], "addMonths", classOf[Long],
classOf[Int])
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 11b1cf199ab..208c0d8d92c 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -181,14 +181,22 @@ object ScalarOperatorGens {
}
case (TIME_WITHOUT_TIME_ZONE, INTERVAL_DAY_TIME) =>
- generateOperatorIfNotNull(ctx, new TimeType(), left, right) {
+ generateOperatorIfNotNull(
+ ctx,
+ new TimeType(LogicalTypeChecks.getPrecision(left.resultType)),
+ left,
+ right) {
(l, r) =>
s"java.lang.Math.toIntExact((($l + ${MILLIS_PER_DAY}L) $op (" +
s"java.lang.Math.toIntExact($r % ${MILLIS_PER_DAY}L))) %
${MILLIS_PER_DAY}L)"
}
case (TIME_WITHOUT_TIME_ZONE, INTERVAL_YEAR_MONTH) =>
- generateOperatorIfNotNull(ctx, new TimeType(), left, right)((l, r) =>
s"$l")
+ generateOperatorIfNotNull(
+ ctx,
+ new TimeType(LogicalTypeChecks.getPrecision(left.resultType)),
+ left,
+ right)((l, r) => s"$l")
case (TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE,
INTERVAL_DAY_TIME) =>
generateOperatorIfNotNull(ctx, left.resultType, left, right) {
@@ -609,9 +617,11 @@ object ScalarOperatorGens {
}
}
// both sides are numeric
- else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
- (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
- }
+ else if (
+ isNumeric(left.resultType) && isNumeric(right.resultType)
+ || isTime(left.resultType) &&
+ isTime(right.resultType)
+ ) { (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm" }
// both sides are timestamp
else if (isTimestamp(left.resultType) && isTimestamp(right.resultType)) {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index a157ea591ec..ce6bbece424 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -318,7 +318,8 @@ class AggFunctionFactory(
case DATE =>
new MinAggFunction.DateMinAggFunction
case TIME_WITHOUT_TIME_ZONE =>
- new MinAggFunction.TimeMinAggFunction
+ val t = argTypes(0).asInstanceOf[TimeType]
+ new MinAggFunction.TimeMinAggFunction(t)
case TIMESTAMP_WITHOUT_TIME_ZONE =>
val d = argTypes(0).asInstanceOf[TimestampType]
new MinAggFunction.TimestampMinAggFunction(d)
@@ -377,7 +378,8 @@ class AggFunctionFactory(
case DATE =>
new LeadLagAggFunction.DateLeadLagAggFunction(argTypes.length)
case TIME_WITHOUT_TIME_ZONE =>
- new LeadLagAggFunction.TimeLeadLagAggFunction(argTypes.length)
+ val t = argTypes(0).asInstanceOf[TimeType]
+ new LeadLagAggFunction.TimeLeadLagAggFunction(t, argTypes.length)
case TIMESTAMP_WITHOUT_TIME_ZONE =>
val d = argTypes(0).asInstanceOf[TimestampType]
new LeadLagAggFunction.TimestampLeadLagAggFunction(argTypes.length, d)
@@ -427,7 +429,8 @@ class AggFunctionFactory(
case DATE =>
new MaxAggFunction.DateMaxAggFunction
case TIME_WITHOUT_TIME_ZONE =>
- new MaxAggFunction.TimeMaxAggFunction
+ val t = argTypes(0).asInstanceOf[TimeType]
+ new MaxAggFunction.TimeMaxAggFunction(t)
case TIMESTAMP_WITHOUT_TIME_ZONE =>
val d = argTypes(0).asInstanceOf[TimestampType]
new MaxAggFunction.TimestampMaxAggFunction(d)
@@ -520,7 +523,8 @@ class AggFunctionFactory(
case DATE =>
new DateSingleValueAggFunction
case TIME_WITHOUT_TIME_ZONE =>
- new TimeSingleValueAggFunction
+ val t = argTypes(0).asInstanceOf[TimeType]
+ new TimeSingleValueAggFunction(t)
case TIMESTAMP_WITHOUT_TIME_ZONE =>
val d = argTypes(0).asInstanceOf[TimestampType]
new TimestampSingleValueAggFunction(d)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
index a5b31c0dbac..462b25eab8a 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
@@ -724,10 +724,10 @@ public class CastFunctionITCase extends
BuiltInFunctionTestBase {
.failRuntime(STRING(), "123:45",
DateTimeException.class)
.failRuntime(STRING(), "2021-09-27",
DateTimeException.class)
.failRuntime(STRING(), "2021-09-27 12:34:56",
DateTimeException.class)
- // https://issues.apache.org/jira/browse/FLINK-17224
Fractional seconds are
- // lost
.fromCase(STRING(), "23", LocalTime.of(23, 0, 0, 0))
.fromCase(STRING(), "23:45", LocalTime.of(23, 45, 0,
0))
+ // https://issues.apache.org/jira/browse/FLINK-39214
+ // Fractional seconds below milliseconds are lost
.fromCase(STRING(), "12:34:56.123456789",
LocalTime.of(12, 34, 56, 0))
.failRuntime(
STRING(), "2021-09-27 12:34:56.123456789",
DateTimeException.class)
@@ -764,6 +764,67 @@ public class CastFunctionITCase extends
BuiltInFunctionTestBase {
// ROW
// RAW
.build(),
+ CastTestSpecBuilder.testCastTo(TIME(3))
+ .fromCase(TIME(), null, null)
+ .failRuntime(CHAR(3), "foo", DateTimeException.class)
+ .failRuntime(VARCHAR(5), "Flink",
DateTimeException.class)
+ .failRuntime(STRING(), "Flink",
DateTimeException.class)
+ .failRuntime(STRING(), "123", DateTimeException.class)
+ .failRuntime(STRING(), "123:45",
DateTimeException.class)
+ .failRuntime(STRING(), "2021-09-27",
DateTimeException.class)
+ .failRuntime(STRING(), "2021-09-27 12:34:56",
DateTimeException.class)
+ .fromCase(STRING(), "23", LocalTime.of(23, 0, 0, 0))
+ .fromCase(STRING(), "23:45", LocalTime.of(23, 45, 0,
0))
+ .fromCase(
+ STRING(),
+ "12:34:56.123456789",
+ LocalTime.of(12, 34, 56, 123_000_000))
+ .failRuntime(
+ STRING(), "2021-09-27 12:34:56.123456789",
DateTimeException.class)
+ // Not supported - no fix
+ .failValidation(BOOLEAN(), true)
+ .failTableApiValidation(BINARY(2), DEFAULT_BINARY)
+ .failTableApiValidation(VARBINARY(5),
DEFAULT_VARBINARY)
+ .failTableApiValidation(BYTES(), DEFAULT_BYTES)
+ .failValidation(DECIMAL(5, 3), 12.345)
+ .failValidation(TINYINT(), DEFAULT_POSITIVE_TINY_INT)
+ .failValidation(SMALLINT(), DEFAULT_POSITIVE_SMALL_INT)
+ .failValidation(INT(), DEFAULT_POSITIVE_INT)
+ .failValidation(BIGINT(), DEFAULT_POSITIVE_BIGINT)
+ .failValidation(FLOAT(), DEFAULT_POSITIVE_FLOAT)
+ .failValidation(DOUBLE(), DEFAULT_POSITIVE_DOUBLE)
+ .failValidation(DATE(), DEFAULT_DATE)
+ .fromCase(TIME(5), DEFAULT_TIME, LocalTime.of(12, 34,
56, 123_000_000))
+ .fromCase(
+ TIMESTAMP(),
+ DEFAULT_TIMESTAMP,
+ LocalTime.of(12, 34, 56, 123_000_000))
+ .fromCase(
+ TIMESTAMP(4),
+ DEFAULT_TIMESTAMP,
+ LocalTime.of(12, 34, 56, 123_000_000))
+
+ // https://issues.apache.org/jira/browse/FLINK-20869
+ // TIMESTAMP_WITH_TIME_ZONE
+
+ // https://issues.apache.org/jira/browse/FLINK-24422 -
Accept only Instant
+ .fromCase(
+ TIMESTAMP_LTZ(4),
+ DEFAULT_TIMESTAMP,
+ LocalTime.of(12, 34, 56, 123_000_000))
+ .fromCase(
+ TIMESTAMP_LTZ(4),
+ DEFAULT_TIMESTAMP_LTZ,
+ LocalTime.of(7, 54, 56, 123_000_000))
+ // Not supported - no fix
+ .failValidation(INTERVAL(YEAR(), MONTH()),
DEFAULT_INTERVAL_YEAR)
+ .failValidation(INTERVAL(DAY(), SECOND()),
DEFAULT_INTERVAL_DAY)
+ .failValidation(ARRAY(INT()), DEFAULT_ARRAY)
+ // MULTISET
+ // MAP
+ // ROW
+ // RAW
+ .build(),
CastTestSpecBuilder.testCastTo(TIMESTAMP(9))
.fromCase(TIMESTAMP(), null, null)
.failRuntime(CHAR(3), "foo", DateTimeException.class)
@@ -790,12 +851,14 @@ public class CastFunctionITCase extends
BuiltInFunctionTestBase {
//
.fromCase(DATE(), DEFAULT_DATE, LocalDateTime.of(2021,
9, 24, 0, 0, 0, 0))
- // https://issues.apache.org/jira/browse/FLINK-17224
Fractional seconds are
- // lost
+ // https://issues.apache.org/jira/browse/FLINK-39214
+ // Fractional seconds below milliseconds are lost
// https://issues.apache.org/jira/browse/FLINK-24423
Continue using EPOCH
// date or use 0 for the year?
.fromCase(
- TIME(5), DEFAULT_TIME, LocalDateTime.of(1970,
1, 1, 12, 34, 56, 0))
+ TIME(5),
+ DEFAULT_TIME,
+ LocalDateTime.of(1970, 1, 1, 12, 34, 56,
123_000_000))
.fromCase(
TIMESTAMP(),
DEFAULT_TIMESTAMP,
@@ -868,14 +931,13 @@ public class CastFunctionITCase extends
BuiltInFunctionTestBase {
DEFAULT_DATE,
fromLocalToUTC(LocalDateTime.of(2021, 9, 24,
0, 0, 0, 0)))
- // https://issues.apache.org/jira/browse/FLINK-17224
Fractional seconds are
- // lost
// https://issues.apache.org/jira/browse/FLINK-24423
Continue using EPOCH
// date or use 0 for the year?
.fromCase(
TIME(5),
DEFAULT_TIME,
- fromLocalToUTC(LocalDateTime.of(1970, 1, 1,
12, 34, 56, 0)))
+ fromLocalToUTC(
+ LocalDateTime.of(1970, 1, 1, 12, 34,
56, 123_000_000)))
.fromCase(
TIMESTAMP(),
DEFAULT_TIMESTAMP,
@@ -1039,9 +1101,7 @@ public class CastFunctionITCase extends
BuiltInFunctionTestBase {
DEFAULT_NEGATIVE_DOUBLE,
String.valueOf(DEFAULT_NEGATIVE_DOUBLE))
.fromCase(DATE(), DEFAULT_DATE, "2021-09-24")
- // https://issues.apache.org/jira/browse/FLINK-17224
Currently, fractional
- // seconds are lost
- .fromCase(TIME(5), DEFAULT_TIME, "12:34:56")
+ .fromCase(TIME(5), DEFAULT_TIME, "12:34:56.123")
.fromCase(TIMESTAMP(), DEFAULT_TIMESTAMP, "2021-09-24
12:34:56.123456")
.fromCase(TIMESTAMP(9), DEFAULT_TIMESTAMP, "2021-09-24
12:34:56.123456700")
.fromCase(TIMESTAMP(4), DEFAULT_TIMESTAMP, "2021-09-24
12:34:56.1234")
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
index fc500d9a518..fe4b766ffd0 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java
@@ -355,11 +355,11 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase
{
return Stream.of(
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS)
.onFieldsWithData(
- LocalTime.of(2, 55, 0),
+ LocalTime.of(2, 55, 0, 123_000_000),
Duration.ofHours(1),
LocalTime.of(3, 30, 0),
Duration.ofHours(2))
- .andDataTypes(TIME(), INTERVAL(HOUR()), TIME(),
INTERVAL(HOUR()))
+ .andDataTypes(TIME(3), INTERVAL(HOUR()), TIME(3),
INTERVAL(HOUR()))
.testResult(
temporalOverlaps($("f0"), $("f1"), $("f2"),
$("f3")),
"(f0, f1) OVERLAPS (f2, f3)",
@@ -450,18 +450,16 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase
{
return Stream.of(
TestSetSpec.forFunction(BuiltInFunctionDefinitions.FLOOR)
.onFieldsWithData(
- //
https://issues.apache.org/jira/browse/FLINK-17224
- // Fractional seconds are lost
- LocalTime.of(11, 22, 33),
+ LocalTime.of(11, 22, 33, 123_456_789),
LocalDate.of(1990, 10, 14),
LocalDateTime.of(2020, 2, 29, 1, 56, 59,
987654321),
LocalDateTime.of(2021, 9, 24, 9, 20, 50,
924325471))
- .andDataTypes(TIME(), DATE(), TIMESTAMP(), TIMESTAMP())
+ .andDataTypes(TIME(3), DATE(), TIMESTAMP(),
TIMESTAMP())
.testResult(
$("f0").ceil(TimeIntervalUnit.MILLISECOND),
"CEIL(f0 TO MILLISECOND)",
- LocalTime.of(11, 22, 33),
- TIME().nullable())
+ LocalTime.of(11, 22, 33, 123_000_000),
+ TIME(3).nullable())
.testResult(
$("f1").ceil(TimeIntervalUnit.MILLISECOND),
"CEIL(f1 TO MILLISECOND)",
@@ -475,8 +473,8 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
.testResult(
$("f0").ceil(TimeIntervalUnit.SECOND),
"CEIL(f0 TO SECOND)",
- LocalTime.of(11, 22, 33),
- TIME().nullable())
+ LocalTime.of(11, 22, 34),
+ TIME(3).nullable())
.testResult(
$("f1").ceil(TimeIntervalUnit.SECOND),
"CEIL(f1 TO SECOND)",
@@ -491,7 +489,7 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
$("f0").ceil(TimeIntervalUnit.MINUTE),
"CEIL(f0 TO MINUTE)",
LocalTime.of(11, 23),
- TIME().nullable())
+ TIME(3).nullable())
.testResult(
$("f1").ceil(TimeIntervalUnit.MINUTE),
"CEIL(f1 TO MINUTE)",
@@ -506,7 +504,7 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
$("f0").ceil(TimeIntervalUnit.HOUR),
"CEIL(f0 TO HOUR)",
LocalTime.of(12, 0),
- TIME().nullable())
+ TIME(3).nullable())
.testResult(
$("f1").ceil(TimeIntervalUnit.HOUR),
"CEIL(f1 TO HOUR)",
@@ -635,17 +633,15 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase
{
return Stream.of(
TestSetSpec.forFunction(BuiltInFunctionDefinitions.FLOOR)
.onFieldsWithData(
- //
https://issues.apache.org/jira/browse/FLINK-17224
- // Fractional seconds are lost
- LocalTime.of(11, 22, 33),
+ LocalTime.of(11, 22, 33, 123_456_789),
LocalDate.of(1990, 10, 14),
LocalDateTime.of(2020, 2, 29, 1, 56, 59,
987654321))
- .andDataTypes(TIME(), DATE(), TIMESTAMP())
+ .andDataTypes(TIME(3), DATE(), TIMESTAMP())
.testResult(
$("f0").floor(TimeIntervalUnit.MILLISECOND),
"FLOOR(f0 TO MILLISECOND)",
- LocalTime.of(11, 22, 33),
- TIME().nullable())
+ LocalTime.of(11, 22, 33, 123_000_000),
+ TIME(3).nullable())
.testResult(
$("f1").floor(TimeIntervalUnit.MILLISECOND),
"FLOOR(f1 TO MILLISECOND)",
@@ -660,7 +656,7 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
$("f0").floor(TimeIntervalUnit.SECOND),
"FLOOR(f0 TO SECOND)",
LocalTime.of(11, 22, 33),
- TIME().nullable())
+ TIME(3).nullable())
.testResult(
$("f1").floor(TimeIntervalUnit.SECOND),
"FLOOR(f1 TO SECOND)",
@@ -675,7 +671,7 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
$("f0").floor(TimeIntervalUnit.MINUTE),
"FLOOR(f0 TO MINUTE)",
LocalTime.of(11, 22),
- TIME().nullable())
+ TIME(3).nullable())
.testResult(
$("f1").floor(TimeIntervalUnit.MINUTE),
"FLOOR(f1 TO MINUTE)",
@@ -690,7 +686,7 @@ class TimeFunctionsITCase extends BuiltInFunctionTestBase {
$("f0").floor(TimeIntervalUnit.HOUR),
"FLOOR(f0 TO HOUR)",
LocalTime.of(11, 0),
- TIME().nullable())
+ TIME(3).nullable())
.testResult(
$("f1").floor(TimeIntervalUnit.HOUR),
"FLOOR(f1 TO HOUR)",
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index 768736d4c63..82b0d7b0af7 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -475,8 +475,8 @@ class CastRulesTest {
STRING(),
fromString("2021-09-27 12:34:56"),
TableRuntimeException.class)
- // https://issues.apache.org/jira/browse/FLINK-17224
Currently, fractional
- // seconds are lost
+ // https://issues.apache.org/jira/browse/FLINK-39214
+ // Fractional seconds below milliseconds are lost
.fromCase(
STRING(),
fromString("12:34:56.123456789"),
@@ -536,8 +536,8 @@ class CastRulesTest {
DATE(),
DateTimeUtils.toInternal(LocalDate.of(2022, 1,
4)),
timestampDataFromLocalDateTime(2022, 1, 4, 0,
0, 0, 0))
- // https://issues.apache.org/jira/browse/FLINK-17224
Currently, fractional
- // seconds are lost
+ // https://issues.apache.org/jira/browse/FLINK-39214
+ // Fractional seconds below milliseconds are lost
.fromCase(
TIME(5),
TIME,
@@ -613,8 +613,8 @@ class CastRulesTest {
DATE(),
DateTimeUtils.toInternal(LocalDate.of(2022, 1,
4)),
timestampDataFromInstant(2022, 1, 4, 1, 0, 0,
0))
- // https://issues.apache.org/jira/browse/FLINK-17224
Currently, fractional
- // seconds are lost
+ // https://issues.apache.org/jira/browse/FLINK-39214
+ // Fractional seconds below milliseconds are lost
.fromCase(
TIME(5),
TIME,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 12496179523..66b338aebb6 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.TableDistribution.Kind;
@@ -104,6 +105,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.table.api.Expressions.$;
+import static
org.apache.flink.table.planner.operations.SqlDdlToOperationConverterTest.TestItem.createTestItem;
import static org.apache.flink.table.planner.utils.OperationMatchers.entry;
import static
org.apache.flink.table.planner.utils.OperationMatchers.isCreateTableOperation;
import static
org.apache.flink.table.planner.utils.OperationMatchers.partitionedBy;
@@ -807,10 +809,10 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
+ " ROW<`tmstmp` TIMESTAMP(3)>.");
}
- @Test // TODO: tweak the tests when FLINK-13604 is fixed.
+ @Test
void testCreateTableWithFullDataTypes() {
final List<TestItem> testItems =
- Arrays.asList(
+ List.of(
createTestItem("CHAR", DataTypes.CHAR(1)),
createTestItem("CHAR NOT NULL",
DataTypes.CHAR(1).notNull()),
createTestItem("CHAR NULL", DataTypes.CHAR(1)),
@@ -844,10 +846,8 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
createTestItem("DATE", DataTypes.DATE()),
createTestItem("TIME", DataTypes.TIME()),
createTestItem("TIME WITHOUT TIME ZONE",
DataTypes.TIME()),
- // Expect to be TIME(3).
- createTestItem("TIME(3)", DataTypes.TIME()),
- // Expect to be TIME(3).
- createTestItem("TIME(3) WITHOUT TIME ZONE",
DataTypes.TIME()),
+ createTestItem("TIME(3)", DataTypes.TIME(3)),
+ createTestItem("TIME(3) WITHOUT TIME ZONE",
DataTypes.TIME(3)),
createTestItem("TIMESTAMP", DataTypes.TIMESTAMP(6)),
createTestItem("TIMESTAMP WITHOUT TIME ZONE",
DataTypes.TIMESTAMP(6)),
createTestItem("TIMESTAMP(3)", DataTypes.TIMESTAMP(3)),
@@ -951,11 +951,15 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
SqlNode node = parser.parse(sql);
assertThat(node).isInstanceOf(SqlCreateTable.class);
- Operation operation =
- SqlNodeToOperationConversion.convert(planner, catalogManager,
node).get();
- TableSchema schema = ((CreateTableOperation)
operation).getCatalogTable().getSchema();
- Object[] expectedDataTypes = testItems.stream().map(item ->
item.expectedType).toArray();
- assertThat(schema.getFieldDataTypes()).isEqualTo(expectedDataTypes);
+ final Optional<Operation> convert =
+ SqlNodeToOperationConversion.convert(planner, catalogManager,
node);
+ assertThat(convert).isPresent();
+ Operation operation = convert.get();
+ ResolvedSchema schema =
+ ((CreateTableOperation)
operation).getCatalogTable().getResolvedSchema();
+ List<DataType> expectedDataTypes =
+ testItems.stream().map(item ->
item.expectedType).collect(Collectors.toList());
+ assertThat(schema.getColumnDataTypes()).isEqualTo(expectedDataTypes);
}
@Test
@@ -2716,18 +2720,6 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
// ~ Tool Methods
----------------------------------------------------------
- private static TestItem createTestItem(Object... args) {
- assertThat(args).hasSize(2);
- final String testExpr = (String) args[0];
- TestItem testItem = TestItem.fromTestExpr(testExpr);
- if (args[1] instanceof String) {
- testItem.withExpectedError((String) args[1]);
- } else {
- testItem.withExpectedType(args[1]);
- }
- return testItem;
- }
-
private CatalogTable prepareTable(boolean hasConstraint) throws Exception {
return prepareTable("tb1", hasConstraint ? 1 : 0);
}
@@ -2952,27 +2944,34 @@ class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversionTestBas
// ~ Inner Classes
----------------------------------------------------------
- private static class TestItem {
+ static class TestItem {
private final String testExpr;
- @Nullable private Object expectedType;
- @Nullable private String expectedError;
+ @Nullable private final DataType expectedType;
+ @Nullable private final String expectedError;
- private TestItem(String testExpr) {
+ private TestItem(
+ final String testExpr,
+ @Nullable final DataType expectedType,
+ @Nullable final String expectedError) {
this.testExpr = testExpr;
+ this.expectedType = expectedType;
+ this.expectedError = expectedError;
}
- static TestItem fromTestExpr(String testExpr) {
- return new TestItem(testExpr);
+ private TestItem(final String testExpr, @Nullable final DataType
expectedType) {
+ this(testExpr, expectedType, null);
}
- TestItem withExpectedType(Object expectedType) {
- this.expectedType = expectedType;
- return this;
+ private TestItem(final String testExpr, @Nullable final String
expectedError) {
+ this(testExpr, null, expectedError);
}
- TestItem withExpectedError(String expectedError) {
- this.expectedError = expectedError;
- return this;
+ static TestItem createTestItem(String testExpr, DataType dataType) {
+ return new TestItem(testExpr, dataType);
+ }
+
+ static TestItem createTestItem(String testExpr, String expectedError) {
+ return new TestItem(testExpr, expectedError);
}
@Override
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MLPredictTableFunctionTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MLPredictTableFunctionTest.xml
index 8e5c5c6696b..3b0991d8c2d 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MLPredictTableFunctionTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MLPredictTableFunctionTest.xml
@@ -104,14 +104,14 @@ FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel,
DESCRIPTOR(col)))]]>
<Resource name="ast">
<![CDATA[
LogicalProject(col=[$0], res=[$1])
-+- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
+- LogicalProject(col=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
+- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
]]>
</Resource>
@@ -384,14 +384,14 @@ FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel,
DESCRIPTOR(col)))]]>
<Resource name="ast">
<![CDATA[
LogicalProject(col=[$0], res=[$1])
-+- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
+- LogicalProject(col=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
+- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
]]>
</Resource>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.xml
index b4f8876ce50..b1d51479529 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MLPredictTableFunctionTest.xml
@@ -104,14 +104,14 @@ FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel,
DESCRIPTOR(col)))]]>
<Resource name="ast">
<![CDATA[
LogicalProject(col=[$0], res=[$1])
-+- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
+- LogicalProject(col=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
+- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
]]>
</Resource>
@@ -384,14 +384,14 @@ FROM TABLE(ML_PREDICT(TABLE TypeTable, MODEL TypeModel,
DESCRIPTOR(col)))]]>
<Resource name="ast">
<![CDATA[
LogicalProject(col=[$0], res=[$1])
-+- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
++- LogicalTableFunctionScan(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
+- LogicalProject(col=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database,
TypeTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(0) col, VARCHAR(2147483647) res)])
+MLPredictTableFunction(invocation=[ML_PREDICT(TABLE(#0), Model(MODEL
default_catalog.default_database.TypeModel), DESCRIPTOR(_UTF-16LE'col'),
DEFAULT())], rowType=[RecordType(TIME(3) col, VARCHAR(2147483647) res)])
+- TableSourceScan(table=[[default_catalog, default_database, TypeTable]],
fields=[col])
]]>
</Resource>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ValuesTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ValuesTest.xml
index 477ffef8d0c..3ca767eb559 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ValuesTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ValuesTest.xml
@@ -244,9 +244,9 @@ Union(all=[true], union=[a, b])
<Resource name="ast">
<![CDATA[
LogicalUnion(all=[true])
-:- LogicalProject(number=[CAST(1:DOUBLE):DOUBLE], row=[ROW(_UTF-16LE'A ',
2:DECIMAL(10, 2), ROW(00:00:00))], array=[ARRAY(1:BIGINT)])
+:- LogicalProject(number=[CAST(1:DOUBLE):DOUBLE], row=[ROW(_UTF-16LE'A ',
2:DECIMAL(10, 2), ROW(00:00:00:TIME(3)))], array=[ARRAY(1:BIGINT)])
: +- LogicalValues(tuples=[[{ 0 }]])
-+- LogicalProject(number=[CAST(3.141592653589793E0:DOUBLE):DOUBLE],
row=[ROW(_UTF-16LE'ABC ', 3.0E0:DECIMAL(10, 2), ROW(00:00:00))],
array=[ARRAY(3:BIGINT)])
++- LogicalProject(number=[CAST(3.141592653589793E0:DOUBLE):DOUBLE],
row=[ROW(_UTF-16LE'ABC ', 3.0E0:DECIMAL(10, 2), ROW(00:00:00.1:TIME(3)))],
array=[ARRAY(3:BIGINT)])
+- LogicalValues(tuples=[[{ 0 }]])
]]>
</Resource>
@@ -255,7 +255,7 @@ LogicalUnion(all=[true])
Union(all=[true], union=[number, row, array])
:- Calc(select=[CAST(1 AS DOUBLE) AS number, ROW('A ', 2, ROW(00:00:00)) AS
row, ARRAY(1) AS array])
: +- Values(tuples=[[{ 0 }]])(reuse_id=[1])
-+- Calc(select=[CAST(3.141592653589793E0 AS DOUBLE) AS number, ROW('ABC ',
3.0E0, ROW(00:00:00)) AS row, ARRAY(3) AS array])
++- Calc(select=[CAST(3.141592653589793E0 AS DOUBLE) AS number, ROW('ABC ',
3.0E0, ROW(00:00:00.1)) AS row, ARRAY(3) AS array])
+- Reused(reference_id=[1])
]]>
</Resource>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index 351a2d4095c..60e93f1004c 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -610,7 +610,7 @@ class TemporalTypesTest extends ExpressionTestBase {
@Test
def testDateAndTime(): Unit = {
testSqlApi("DATE '2018-03-14'", "2018-03-14")
- testSqlApi("TIME '19:01:02.123'", "19:01:02")
+ testSqlApi("TIME '19:01:02.123'", "19:01:02.123")
// DATE & TIME
testSqlApi("CAST('12:44:31' AS TIME)", "12:44:31")
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java
index 591e1899b9d..c203b283dc7 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java
@@ -37,6 +37,7 @@ import static
org.apache.flink.table.types.logical.LogicalTypeRoot.ROW;
import static
org.apache.flink.table.types.logical.LogicalTypeRoot.STRUCTURED_TYPE;
import static
org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
import static
org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+import static
org.apache.flink.table.types.logical.LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARIANT;
import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isRowtimeAttribute;
@@ -196,4 +197,8 @@ public class TypeCheckUtils {
return true;
}
}
+
+ public static boolean isTime(LogicalType type) {
+ return type.getTypeRoot() == TIME_WITHOUT_TIME_ZONE;
+ }
}