This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 3c339da  [FLINK-15050][table-planner-blink] DataFormatConverters 
should support any TIMESTAMP WITHOUT TIME ZONE types
3c339da is described below

commit 3c339dad1db229099236997880eb1f951ab69e08
Author: Zhenghua Gao <[email protected]>
AuthorDate: Wed Dec 4 17:08:06 2019 +0800

    [FLINK-15050][table-planner-blink] DataFormatConverters should support any 
TIMESTAMP WITHOUT TIME ZONE types
    
    This closes #10418
---
 .../table/dataformat/DataFormatConverters.java     | 32 ++++++++------
 .../table/dataformat/DataFormatConvertersTest.java | 51 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 13 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
index 41216a4..7e9c422 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
@@ -121,9 +121,6 @@ public class DataFormatConverters {
                t2C.put(DataTypes.TIME().bridgedTo(Integer.class), 
IntConverter.INSTANCE);
                t2C.put(DataTypes.TIME().bridgedTo(int.class), 
IntConverter.INSTANCE);
 
-               t2C.put(DataTypes.TIMESTAMP(3).bridgedTo(Timestamp.class), new 
TimestampConverter(3));
-               t2C.put(DataTypes.TIMESTAMP(3).bridgedTo(LocalDateTime.class), 
new LocalDateTimeConverter(3));
-
                
t2C.put(DataTypes.INTERVAL(DataTypes.MONTH()).bridgedTo(Integer.class), 
IntConverter.INSTANCE);
                
t2C.put(DataTypes.INTERVAL(DataTypes.MONTH()).bridgedTo(int.class), 
IntConverter.INSTANCE);
 
@@ -169,14 +166,23 @@ public class DataFormatConverters {
                                } else {
                                        return new DecimalConverter(ps.f0, 
ps.f1);
                                }
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                               int precisionOfTS = 
getDateTimePrecision(logicalType);
+                               if (clazz == Timestamp.class) {
+                                       return new 
TimestampConverter(precisionOfTS);
+                               } else if (clazz == LocalDateTime.class) {
+                                       return new 
LocalDateTimeConverter(precisionOfTS);
+                               } else {
+                                       return new 
SqlTimestampConverter(precisionOfTS);
+                               }
                        case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                               int precision = 
getDateTimePrecision(logicalType);
+                               int precisionOfLZTS = 
getDateTimePrecision(logicalType);
                                if (clazz == Instant.class) {
-                                       return new InstantConverter(precision);
+                                       return new 
InstantConverter(precisionOfLZTS);
                                } else if (clazz == Long.class || clazz == 
long.class) {
-                                       return new 
LongSqlTimestampConverter(precision);
+                                       return new 
LongSqlTimestampConverter(precisionOfLZTS);
                                } else {
-                                       return new 
SqlTimestampConverter(precision);
+                                       return new 
SqlTimestampConverter(precisionOfLZTS);
                                }
                        case ARRAY:
                                if (clazz == BinaryArray.class) {
@@ -264,11 +270,6 @@ public class DataFormatConverters {
                                        return BinaryGenericConverter.INSTANCE;
                                }
                                return new 
GenericConverter(typeInfo.createSerializer(new ExecutionConfig()));
-                       case TIMESTAMP_WITHOUT_TIME_ZONE:
-                               if 
(dataType.getConversionClass().equals(LocalDateTime.class)) {
-                                       return new 
LocalDateTimeConverter(((TimestampType) logicalType).getPrecision());
-                               }
-                               return new TimestampConverter(((TimestampType) 
logicalType).getPrecision());
                        default:
                                throw new RuntimeException("Not support 
dataType: " + dataType);
                }
@@ -301,12 +302,17 @@ public class DataFormatConverters {
        private static int getDateTimePrecision(LogicalType logicalType) {
                if (logicalType instanceof LocalZonedTimestampType) {
                        return ((LocalZonedTimestampType) 
logicalType).getPrecision();
+               } else if (logicalType instanceof TimestampType) {
+                       return ((TimestampType) logicalType).getPrecision();
                } else {
                        TypeInformation typeInfo = ((LegacyTypeInformationType) 
logicalType).getTypeInformation();
                        if (typeInfo instanceof LegacyInstantTypeInfo) {
                                return ((LegacyInstantTypeInfo) 
typeInfo).getPrecision();
+                       } else if (typeInfo instanceof 
LegacyLocalDateTimeTypeInfo) {
+                               return ((LegacyLocalDateTimeTypeInfo) 
typeInfo).getPrecision();
                        } else {
-                               return 
LocalZonedTimestampType.DEFAULT_PRECISION;
+                               // TimestampType.DEFAULT_PRECISION == 
LocalZonedTimestampType.DEFAULT_PRECISION == 6
+                               return TimestampType.DEFAULT_PRECISION;
                        }
                }
        }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java
index 1d42b5f..2ca2c87 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatConvertersTest.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -31,12 +32,18 @@ import 
org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.table.api.DataTypes;
 import 
org.apache.flink.table.dataformat.DataFormatConverters.DataFormatConverter;
 import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
 import org.apache.flink.table.runtime.typeutils.BinaryStringTypeInfo;
 import org.apache.flink.table.runtime.typeutils.DecimalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.LegacyTimestampTypeInfo;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
@@ -44,6 +51,8 @@ import org.apache.flink.types.Row;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.HashMap;
 
@@ -108,6 +117,30 @@ public class DataFormatConvertersTest {
                        BinaryString.fromString("hahah")
        };
 
+       private DataType[] dataTypes = new DataType[] {
+               DataTypes.TIMESTAMP(9).bridgedTo(LocalDateTime.class),
+               DataTypes.TIMESTAMP(9).bridgedTo(Timestamp.class),
+               DataTypes.TIMESTAMP(3),
+               new AtomicDataType(
+                       new LegacyTypeInformationType<>(
+                               LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+                               SqlTimeTypeInfo.TIMESTAMP)),
+               new AtomicDataType(
+                       new LegacyTypeInformationType<>(
+                               LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+                               new LegacyTimestampTypeInfo(7))),
+               DataTypes.TIMESTAMP(3).bridgedTo(SqlTimestamp.class)
+       };
+
+       private Object[] dataValues = new Object[] {
+               LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123456789),
+               Timestamp.valueOf("1970-01-01 00:00:00.123456789"),
+               LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123),
+               Timestamp.valueOf("1970-01-01 00:00:00.123"),
+               Timestamp.valueOf("1970-01-01 00:00:00.1234567"),
+               SqlTimestamp.fromEpochMillis(1000L)
+       };
+
        private static DataFormatConverter getConverter(TypeInformation 
typeInfo) {
                return 
getConverterForDataType(TypeConversions.fromLegacyInfoToDataType(typeInfo));
        }
@@ -118,6 +151,17 @@ public class DataFormatConvertersTest {
                                new Object[] 
{converter.toExternal(converter.toInternal(value))}, new Object[] {value}));
        }
 
+       private static DataFormatConverter getConverter(DataType dataType) {
+               return getConverterForDataType(dataType);
+       }
+
+       private static void testDataType(DataType dataType, Object value) {
+               DataFormatConverter converter = getConverter(dataType);
+               Assert.assertTrue(Arrays.deepEquals(
+                       new Object[] 
{converter.toExternal(converter.toInternal(value))}, new Object[]{value}
+               ));
+       }
+
        @Test
        public void testTypes() {
                for (int i = 0; i < simpleTypes.length; i++) {
@@ -162,6 +206,13 @@ public class DataFormatConvertersTest {
                test(TypeExtractor.createTypeInfo(MyPojo.class), new MyPojo(1, 
3));
        }
 
+       @Test
+       public void testDataTypes() {
+               for (int i = 0; i < dataTypes.length; i++) {
+                       testDataType(dataTypes[i], dataValues[i]);
+               }
+       }
+
        /**
         * Test pojo.
         */

Reply via email to