wuchong commented on a change in pull request #7904: 
[FLINK-11827][table-runtime-blink] Introduce DataFormatConverters to convert 
internal data format and java format
URL: https://github.com/apache/flink/pull/7904#discussion_r262782786
 
 

 ##########
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
 ##########
 @@ -0,0 +1,1022 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.type.TypeConverters;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.table.typeutils.BinaryArrayTypeInfo;
+import org.apache.flink.table.typeutils.BinaryMapTypeInfo;
+import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
+import org.apache.flink.types.Row;
+
+import java.lang.reflect.Array;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+import scala.Product;
+
+/**
+ * Converters between internal data format and java format.
+ */
+public class DataFormatConverters {
+
+       private static final Map<TypeInformation, DataFormatConverter> 
TYPE_INFO_TO_CONVERTER;
+       static {
+               Map<TypeInformation, DataFormatConverter> t2C = new HashMap<>();
+               t2C.put(BasicTypeInfo.STRING_TYPE_INFO, 
StringConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.BOOLEAN_TYPE_INFO, 
BooleanConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.INT_TYPE_INFO, IntConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.LONG_TYPE_INFO, LongConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.FLOAT_TYPE_INFO, FloatConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.DOUBLE_TYPE_INFO, 
DoubleConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.SHORT_TYPE_INFO, ShortConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.BYTE_TYPE_INFO, ByteConverter.INSTANCE);
+               t2C.put(BasicTypeInfo.CHAR_TYPE_INFO, CharConverter.INSTANCE);
+
+               
t2C.put(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveBooleanArrayConverter.INSTANCE);
+               t2C.put(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveIntArrayConverter.INSTANCE);
+               t2C.put(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveLongArrayConverter.INSTANCE);
+               t2C.put(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveFloatArrayConverter.INSTANCE);
+               
t2C.put(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveDoubleArrayConverter.INSTANCE);
+               t2C.put(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveShortArrayConverter.INSTANCE);
+               t2C.put(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveByteArrayConverter.INSTANCE);
+               t2C.put(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO, 
PrimitiveCharArrayConverter.INSTANCE);
+
+               t2C.put(SqlTimeTypeInfo.DATE, DateConverter.INSTANCE);
+               t2C.put(SqlTimeTypeInfo.TIME, TimeConverter.INSTANCE);
+               t2C.put(SqlTimeTypeInfo.TIMESTAMP, TimestampConverter.INSTANCE);
+
+               t2C.put(BinaryStringTypeInfo.INSTANCE, 
BinaryStringConverter.INSTANCE);
+
+               TYPE_INFO_TO_CONVERTER = Collections.unmodifiableMap(t2C);
+       }
+
+       public static DataFormatConverter 
getConverterForTypeInfo(TypeInformation typeInfo) {
+               DataFormatConverter converter = 
TYPE_INFO_TO_CONVERTER.get(typeInfo);
+               if (converter != null) {
+                       return converter;
+               }
+
+               if (typeInfo instanceof BasicArrayTypeInfo) {
+                       return new ObjectArrayConverter(((BasicArrayTypeInfo) 
typeInfo).getComponentInfo());
+               } else if (typeInfo instanceof ObjectArrayTypeInfo) {
+                       return new ObjectArrayConverter(((ObjectArrayTypeInfo) 
typeInfo).getComponentInfo());
+               } else if (typeInfo instanceof MapTypeInfo) {
+                       MapTypeInfo mapType = (MapTypeInfo) typeInfo;
+                       return new MapConverter(mapType.getKeyTypeInfo(), 
mapType.getValueTypeInfo());
+               } else if (typeInfo instanceof RowTypeInfo) {
+                       return new RowConverter((RowTypeInfo) typeInfo);
+               } else if (typeInfo instanceof PojoTypeInfo) {
+                       return new PojoConverter((PojoTypeInfo) typeInfo);
+               } else if (typeInfo instanceof TupleTypeInfo) {
+                       return new TupleConverter((TupleTypeInfo) typeInfo);
+               } else if (typeInfo instanceof TupleTypeInfoBase && 
Product.class.isAssignableFrom(typeInfo.getTypeClass())) {
+                       return new CaseClassConverter((TupleTypeInfoBase) 
typeInfo);
+               } else if (typeInfo instanceof BinaryArrayTypeInfo) {
+                       return BinaryArrayConverter.INSTANCE;
+               } else if (typeInfo instanceof BinaryMapTypeInfo) {
+                       return BinaryMapConverter.INSTANCE;
+               } else if (typeInfo instanceof BaseRowTypeInfo) {
+                       return BaseRowConverter.INSTANCE;
+               } else {
+                       throw new RuntimeException("Not support generic yet: " 
+ typeInfo);
+               }
+       }
+
+       /**
+        * Converter between internal data format and java format.
+        * @param <Internal> Internal data format.
+        * @param <External> External data format.
+        */
+       public abstract static class DataFormatConverter<Internal, External> {
+
+               /**
+                * Converts a external(Java) data format to its internal 
equivalent while automatically handling nulls.
+                */
+               public final Internal toInternal(External value) {
+                       return value == null ? null : toInternalImpl(value);
+               }
+
+               /**
+                * Converts a non-null external(Java) data format to its 
internal equivalent.
+                */
+               abstract Internal toInternalImpl(External value);
+
+               /**
+                * Convert a internal data format to its external(Java) 
equivalent while automatically handling nulls.
+                */
+               public final External toExternal(Internal value) {
+                       return value == null ? null : toExternalImpl(value);
+               }
+
+               /**
+                * Convert a non-null internal data format to its 
external(Java) equivalent.
+                */
+               abstract External toExternalImpl(Internal value);
+
+               /**
+                * Given a internalType row, convert the value at column 
`column` to its external(Java) equivalent.
+                * This method will only be called on non-null columns.
+                */
+               abstract External toExternalImpl(BaseRow row, int column);
+
+               /**
+                * Given a internalType row, convert the value at column 
`column` to its external(Java) equivalent.
+                */
+               public final External toExternal(BaseRow row, int column) {
+                       return row.isNullAt(column) ? null : 
toExternalImpl(row, column);
+               }
+       }
+
+       /**
+        * Identity converter.
+        */
+       public abstract static class IdentityConverter<T> extends 
DataFormatConverter<T, T> {
+
+               @Override
+               T toInternalImpl(T value) {
+                       return value;
+               }
+
+               @Override
+               T toExternalImpl(T value) {
+                       return value;
+               }
+       }
+
+       /**
+        * Converter for boolean.
+        */
+       public static class BooleanConverter extends IdentityConverter<Boolean> 
{
+
+               public static final BooleanConverter INSTANCE = new 
BooleanConverter();
+
+               private BooleanConverter() {}
+
+               @Override
+               Boolean toExternalImpl(BaseRow row, int column) {
+                       return row.getBoolean(column);
+               }
+       }
+
+       /**
+        * Converter for byte.
+        */
+       public static class ByteConverter extends IdentityConverter<Byte> {
+
+               public static final ByteConverter INSTANCE = new 
ByteConverter();
+
+               private ByteConverter() {}
+
+               @Override
+               Byte toExternalImpl(BaseRow row, int column) {
+                       return row.getByte(column);
+               }
+       }
+
+       /**
+        * Converter for short.
+        */
+       public static class ShortConverter extends IdentityConverter<Short> {
+
+               public static final ShortConverter INSTANCE = new 
ShortConverter();
+
+               private ShortConverter() {}
+
+               @Override
+               Short toExternalImpl(BaseRow row, int column) {
+                       return row.getShort(column);
+               }
+       }
+
+       /**
+        * Converter for int.
+        */
+       public static class IntConverter extends IdentityConverter<Integer> {
+
+               public static final IntConverter INSTANCE = new IntConverter();
+
+               private IntConverter() {}
+
+               @Override
+               Integer toExternalImpl(BaseRow row, int column) {
+                       return row.getInt(column);
+               }
+       }
+
+       /**
+        * Converter for long.
+        */
+       public static class LongConverter extends IdentityConverter<Long> {
+
+               public static final LongConverter INSTANCE = new 
LongConverter();
+
+               private LongConverter() {}
+
+               @Override
+               Long toExternalImpl(BaseRow row, int column) {
+                       return row.getLong(column);
+               }
+       }
+
+       /**
+        * Converter for float.
+        */
+       public static class FloatConverter extends IdentityConverter<Float> {
+
+               public static final FloatConverter INSTANCE = new 
FloatConverter();
+
+               private FloatConverter() {}
+
+               @Override
+               Float toExternalImpl(BaseRow row, int column) {
+                       return row.getFloat(column);
+               }
+       }
+
+       /**
+        * Converter for double.
+        */
+       public static class DoubleConverter extends IdentityConverter<Double> {
+
+               public static final DoubleConverter INSTANCE = new 
DoubleConverter();
+
+               private DoubleConverter() {}
+
+               @Override
+               Double toExternalImpl(BaseRow row, int column) {
+                       return row.getDouble(column);
+               }
+       }
+
+       /**
+        * Converter for char.
+        */
+       public static class CharConverter extends IdentityConverter<Character> {
+
+               public static final CharConverter INSTANCE = new 
CharConverter();
+
+               private CharConverter() {}
+
+               @Override
+               Character toExternalImpl(BaseRow row, int column) {
+                       return row.getChar(column);
+               }
+       }
+
+       /**
+        * Converter for BinaryString.
+        */
+       public static class BinaryStringConverter extends 
IdentityConverter<BinaryString> {
+
+               public static final BinaryStringConverter INSTANCE = new 
BinaryStringConverter();
+
+               private BinaryStringConverter() {}
+
+               @Override
+               BinaryString toExternalImpl(BaseRow row, int column) {
+                       return row.getString(column);
+               }
+       }
+
+       /**
+        * Converter for BinaryArray.
+        */
+       public static class BinaryArrayConverter extends 
IdentityConverter<BinaryArray> {
+
+               public static final BinaryArrayConverter INSTANCE = new 
BinaryArrayConverter();
+
+               private BinaryArrayConverter() {}
+
+               @Override
+               BinaryArray toExternalImpl(BaseRow row, int column) {
+                       return row.getArray(column);
+               }
+       }
+
+       /**
+        * Converter for BinaryMap.
+        */
+       public static class BinaryMapConverter extends 
IdentityConverter<BinaryMap> {
+
+               public static final BinaryMapConverter INSTANCE = new 
BinaryMapConverter();
+
+               private BinaryMapConverter() {}
+
+               @Override
+               BinaryMap toExternalImpl(BaseRow row, int column) {
+                       return row.getMap(column);
+               }
+       }
+
+       /**
+        * Converter for String.
+        */
+       public static class StringConverter extends 
DataFormatConverter<BinaryString, String> {
+
+               public static final StringConverter INSTANCE = new 
StringConverter();
+
+               private StringConverter() {}
+
+               @Override
+               BinaryString toInternalImpl(String value) {
+                       return BinaryString.fromString(value);
+               }
+
+               @Override
+               String toExternalImpl(BinaryString value) {
+                       return value.toString();
+               }
+
+               @Override
+               String toExternalImpl(BaseRow row, int column) {
+                       return row.getString(column).toString();
+               }
+       }
+
+       public static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+       public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 
* 1000
+
+       /** Converts the Java type used for UDF parameters of SQL TIME type
+        * ({@link java.sql.Time}) to internal representation (int).
+        *
+        * <p>Converse of {@link #internalToTime(int)}. */
+       public static int toInt(java.sql.Time v) {
+               return (int) (toLong(v) % MILLIS_PER_DAY);
+       }
+
+       /** Converts the Java type used for UDF parameters of SQL DATE type
+        * ({@link java.sql.Date}) to internal representation (int).
+        *
+        * <p>Converse of {@link #internalToDate(int)}. */
+       public static int toInt(java.util.Date v) {
+               return toInt(v, LOCAL_TZ);
+       }
+
+       public static int toInt(java.util.Date v, TimeZone timeZone) {
+               return (int) (toLong(v, timeZone) / MILLIS_PER_DAY);
+       }
+
+       public static long toLong(java.util.Date v) {
+               return toLong(v, LOCAL_TZ);
+       }
+
+       /** Converts the Java type used for UDF parameters of SQL TIMESTAMP type
+        * ({@link java.sql.Timestamp}) to internal representation (long).
+        *
+        * <p>Converse of {@link #internalToTimestamp(long)}. */
+       public static long toLong(Timestamp v) {
+               return toLong(v, LOCAL_TZ);
+       }
+
+       public static long toLong(java.util.Date v, TimeZone timeZone) {
+               long time = v.getTime();
+               return time + (long) timeZone.getOffset(time);
+       }
+
+       /** Converts the internal representation of a SQL DATE (int) to the Java
+        * type used for UDF parameters ({@link java.sql.Date}). */
+       public static java.sql.Date internalToDate(int v) {
 
 Review comment:
   Could we move the date time relative utilities to a specific place, such as 
`org.apache.flink.table.runtime.functions.DateTimeUtils`.
   
   Currently, it is a bit messy putting date time functions in various classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to