twalthr commented on a change in pull request #9239: [FLINK-13385]Align Hive 
data type mapping with FLIP-37
URL: https://github.com/apache/flink/pull/9239#discussion_r310006617
 
 

 ##########
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
 ##########
 @@ -83,112 +102,15 @@ public static String toHiveTypeName(DataType type) {
        public static TypeInfo toHiveTypeInfo(DataType dataType) {
                checkNotNull(dataType, "type cannot be null");
 
-               LogicalTypeRoot type = dataType.getLogicalType().getTypeRoot();
-
-               if (dataType instanceof AtomicDataType) {
-                       switch (type) {
-                               case BOOLEAN:
-                                       return TypeInfoFactory.booleanTypeInfo;
-                               case TINYINT:
-                                       return TypeInfoFactory.byteTypeInfo;
-                               case SMALLINT:
-                                       return TypeInfoFactory.shortTypeInfo;
-                               case INTEGER:
-                                       return TypeInfoFactory.intTypeInfo;
-                               case BIGINT:
-                                       return TypeInfoFactory.longTypeInfo;
-                               case FLOAT:
-                                       return TypeInfoFactory.floatTypeInfo;
-                               case DOUBLE:
-                                       return TypeInfoFactory.doubleTypeInfo;
-                               case DATE:
-                                       return TypeInfoFactory.dateTypeInfo;
-                               case TIMESTAMP_WITHOUT_TIME_ZONE:
-                                       return 
TypeInfoFactory.timestampTypeInfo;
-                               case CHAR: {
-                                       CharType charType = (CharType) 
dataType.getLogicalType();
-                                       if (charType.getLength() > 
HiveChar.MAX_CHAR_LENGTH) {
-                                               throw new CatalogException(
-                                                               
String.format("HiveCatalog doesn't support char type with length of '%d'. " +
-                                                                               
"The maximum length is %d",
-                                                                               
charType.getLength(), HiveChar.MAX_CHAR_LENGTH));
-                                       }
-                                       return 
TypeInfoFactory.getCharTypeInfo(charType.getLength());
-                               }
-                               case VARCHAR: {
-                                       VarCharType varCharType = (VarCharType) 
dataType.getLogicalType();
-                                       // Flink's StringType is defined as 
VARCHAR(Integer.MAX_VALUE)
-                                       // We don't have more information in 
LogicalTypeRoot to distinguish StringType and a VARCHAR(Integer.MAX_VALUE) 
instance
-                                       // Thus always treat 
VARCHAR(Integer.MAX_VALUE) as StringType
-                                       if (varCharType.getLength() == 
Integer.MAX_VALUE) {
-                                               return 
TypeInfoFactory.stringTypeInfo;
-                                       }
-                                       if (varCharType.getLength() > 
HiveVarchar.MAX_VARCHAR_LENGTH) {
-                                               throw new CatalogException(
-                                                               
String.format("HiveCatalog doesn't support varchar type with length of '%d'. " +
-                                                                               
"The maximum length is %d",
-                                                                               
varCharType.getLength(), HiveVarchar.MAX_VARCHAR_LENGTH));
-                                       }
-                                       return 
TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
-                               }
-                               case DECIMAL: {
-                                       DecimalType decimalType = (DecimalType) 
dataType.getLogicalType();
-                                       // Flink and Hive share the same 
precision and scale range
-                                       // Flink already validates the type so 
we don't need to validate again here
-                                       return 
TypeInfoFactory.getDecimalTypeInfo(decimalType.getPrecision(), 
decimalType.getScale());
-                               }
-                               case VARBINARY: {
-                                       // Flink's BytesType is defined as 
VARBINARY(Integer.MAX_VALUE)
-                                       // We don't have more information in 
LogicalTypeRoot to distinguish BytesType and a VARBINARY(Integer.MAX_VALUE) 
instance
-                                       // Thus always treat 
VARBINARY(Integer.MAX_VALUE) as BytesType
-                                       VarBinaryType varBinaryType = 
(VarBinaryType) dataType.getLogicalType();
-                                       if (varBinaryType.getLength() == 
VarBinaryType.MAX_LENGTH) {
-                                               return 
TypeInfoFactory.binaryTypeInfo;
-                                       }
-                                       break;
-                               }
-                               // Flink's primitive types that Hive 2.3.4 
doesn't support: Time, TIMESTAMP_WITH_LOCAL_TIME_ZONE
-                               default:
-                                       break;
-                       }
-               }
-
-               if (dataType instanceof CollectionDataType) {
-
-                       if (type.equals(LogicalTypeRoot.ARRAY)) {
-                               DataType elementType = ((CollectionDataType) 
dataType).getElementDataType();
-
-                               return 
TypeInfoFactory.getListTypeInfo(toHiveTypeInfo(elementType));
-                       }
-
-                       // Flink's collection types that Hive 2.3.4 doesn't 
support: multiset
-               }
-
-               if (dataType instanceof KeyValueDataType) {
-                       KeyValueDataType keyValueDataType = (KeyValueDataType) 
dataType;
-                       DataType keyType = keyValueDataType.getKeyDataType();
-                       DataType valueType = 
keyValueDataType.getValueDataType();
-
-                       return 
TypeInfoFactory.getMapTypeInfo(toHiveTypeInfo(keyType), 
toHiveTypeInfo(valueType));
-               }
-
-               if (dataType instanceof FieldsDataType) {
-                       FieldsDataType fieldsDataType = (FieldsDataType) 
dataType;
-                       // need to retrieve field names in order
-                       List<String> names = ((RowType) 
fieldsDataType.getLogicalType()).getFieldNames();
-
-                       Map<String, DataType> nameToType = 
fieldsDataType.getFieldDataTypes();
-                       List<TypeInfo> typeInfos = new 
ArrayList<>(names.size());
-
-                       for (String name : names) {
-                               
typeInfos.add(toHiveTypeInfo(nameToType.get(name)));
-                       }
-
-                       return TypeInfoFactory.getStructTypeInfo(names, 
typeInfos);
+//             LogicalTypeRoot type = dataType.getLogicalType().getTypeRoot();
 
 Review comment:
   remove this line?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to