This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 3c339da [FLINK-15050][table-planner-blink] DataFormatConverters
should support any TIMESTAMP WITHOUT TIME ZONE types
3c339da is described below
commit 3c339dad1db229099236997880eb1f951ab69e08
Author: Zhenghua Gao <[email protected]>
AuthorDate: Wed Dec 4 17:08:06 2019 +0800
[FLINK-15050][table-planner-blink] DataFormatConverters should support any
TIMESTAMP WITHOUT TIME ZONE types
This closes #10418
---
.../table/dataformat/DataFormatConverters.java | 32 ++++++++------
.../table/dataformat/DataFormatConvertersTest.java | 51 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 13 deletions(-)
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
index 41216a4..7e9c422 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
@@ -121,9 +121,6 @@ public class DataFormatConverters {
t2C.put(DataTypes.TIME().bridgedTo(Integer.class),
IntConverter.INSTANCE);
t2C.put(DataTypes.TIME().bridgedTo(int.class),
IntConverter.INSTANCE);
- t2C.put(DataTypes.TIMESTAMP(3).bridgedTo(Timestamp.class), new
TimestampConverter(3));
- t2C.put(DataTypes.TIMESTAMP(3).bridgedTo(LocalDateTime.class),
new LocalDateTimeConverter(3));
-
t2C.put(DataTypes.INTERVAL(DataTypes.MONTH()).bridgedTo(Integer.class),
IntConverter.INSTANCE);
t2C.put(DataTypes.INTERVAL(DataTypes.MONTH()).bridgedTo(int.class),
IntConverter.INSTANCE);
@@ -169,14 +166,23 @@ public class DataFormatConverters {
} else {
return new DecimalConverter(ps.f0,
ps.f1);
}
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ int precisionOfTS =
getDateTimePrecision(logicalType);
+ if (clazz == Timestamp.class) {
+ return new
TimestampConverter(precisionOfTS);
+ } else if (clazz == LocalDateTime.class) {
+ return new
LocalDateTimeConverter(precisionOfTS);
+ } else {
+ return new
SqlTimestampConverter(precisionOfTS);
+ }
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- int precision =
getDateTimePrecision(logicalType);
+ int precisionOfLZTS =
getDateTimePrecision(logicalType);
if (clazz == Instant.class) {
- return new InstantConverter(precision);
+ return new
InstantConverter(precisionOfLZTS);
} else if (clazz == Long.class || clazz ==
long.class) {
- return new
LongSqlTimestampConverter(precision);
+ return new
LongSqlTimestampConverter(precisionOfLZTS);
} else {
- return new
SqlTimestampConverter(precision);
+ return new
SqlTimestampConverter(precisionOfLZTS);
}
case ARRAY:
if (clazz == BinaryArray.class) {
@@ -264,11 +270,6 @@ public class DataFormatConverters {
return BinaryGenericConverter.INSTANCE;
}
return new
GenericConverter(typeInfo.createSerializer(new ExecutionConfig()));
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- if
(dataType.getConversionClass().equals(LocalDateTime.class)) {
- return new
LocalDateTimeConverter(((TimestampType) logicalType).getPrecision());
- }
- return new TimestampConverter(((TimestampType)
logicalType).getPrecision());
default:
throw new RuntimeException("Not support
dataType: " + dataType);
}
@@ -301,12 +302,17 @@ public class DataFormatConverters {
private static int getDateTimePrecision(LogicalType logicalType) {
if (logicalType instanceof LocalZonedTimestampType) {
return ((LocalZonedTimestampType)
logicalType).getPrecision();
+ } else if (logicalType instanceof TimestampType) {
+ return ((TimestampType) logicalType).getPrecision();
} else {
TypeInformation typeInfo = ((LegacyTypeInformationType)
logicalType).getTypeInformation();
if (typeInfo instanceof LegacyInstantTypeInfo) {
return ((LegacyInstantTypeInfo)
typeInfo).getPrecision();
+ } else if (typeInfo instanceof
LegacyLocalDateTimeTypeInfo) {
+ return ((LegacyLocalDateTimeTypeInfo)
typeInfo).getPrecision();
} else {
- return
LocalZonedTimestampType.DEFAULT_PRECISION;
+ // TimestampType.DEFAULT_PRECISION ==
LocalZonedTimestampType.DEFAULT_PRECISION == 6
+ return TimestampType.DEFAULT_PRECISION;
}
}
}
diff --git
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java
index 1d42b5f..2ca2c87 100644
---
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java
+++
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java
@@ -22,6 +22,7 @@ import
org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -31,12 +32,18 @@ import
org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.table.api.DataTypes;
import
org.apache.flink.table.dataformat.DataFormatConverters.DataFormatConverter;
import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.runtime.typeutils.BinaryStringTypeInfo;
import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.LegacyTimestampTypeInfo;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
@@ -44,6 +51,8 @@ import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.HashMap;
@@ -108,6 +117,30 @@ public class DataFormatConvertersTest {
BinaryString.fromString("hahah")
};
+ private DataType[] dataTypes = new DataType[] {
+ DataTypes.TIMESTAMP(9).bridgedTo(LocalDateTime.class),
+ DataTypes.TIMESTAMP(9).bridgedTo(Timestamp.class),
+ DataTypes.TIMESTAMP(3),
+ new AtomicDataType(
+ new LegacyTypeInformationType<>(
+ LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+ SqlTimeTypeInfo.TIMESTAMP)),
+ new AtomicDataType(
+ new LegacyTypeInformationType<>(
+ LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+ new LegacyTimestampTypeInfo(7))),
+ DataTypes.TIMESTAMP(3).bridgedTo(SqlTimestamp.class)
+ };
+
+ private Object[] dataValues = new Object[] {
+ LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123456789),
+ Timestamp.valueOf("1970-01-01 00:00:00.123456789"),
+ LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123),
+ Timestamp.valueOf("1970-01-01 00:00:00.123"),
+ Timestamp.valueOf("1970-01-01 00:00:00.1234567"),
+ SqlTimestamp.fromEpochMillis(1000L)
+ };
+
private static DataFormatConverter getConverter(TypeInformation
typeInfo) {
return
getConverterForDataType(TypeConversions.fromLegacyInfoToDataType(typeInfo));
}
@@ -118,6 +151,17 @@ public class DataFormatConvertersTest {
new Object[]
{converter.toExternal(converter.toInternal(value))}, new Object[] {value}));
}
+ private static DataFormatConverter getConverter(DataType dataType) {
+ return getConverterForDataType(dataType);
+ }
+
+ private static void testDataType(DataType dataType, Object value) {
+ DataFormatConverter converter = getConverter(dataType);
+ Assert.assertTrue(Arrays.deepEquals(
+ new Object[]
{converter.toExternal(converter.toInternal(value))}, new Object[]{value}
+ ));
+ }
+
@Test
public void testTypes() {
for (int i = 0; i < simpleTypes.length; i++) {
@@ -162,6 +206,13 @@ public class DataFormatConvertersTest {
test(TypeExtractor.createTypeInfo(MyPojo.class), new MyPojo(1,
3));
}
+ @Test
+ public void testDataTypes() {
+ for (int i = 0; i < dataTypes.length; i++) {
+ testDataType(dataTypes[i], dataValues[i]);
+ }
+ }
+
/**
* Test pojo.
*/