twalthr commented on a change in pull request #9239: [FLINK-13385]Align Hive 
data type mapping with FLIP-37
URL: https://github.com/apache/flink/pull/9239#discussion_r310007291
 
 

 ##########
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
 ##########
 @@ -276,4 +198,202 @@ private static DataType 
toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
                                        String.format("Flink doesn't support 
Hive primitive type %s yet", hiveType));
                }
        }
+
+       private static class TypeInfoLogicalTypeVisitor implements 
LogicalTypeVisitor<TypeInfo> {
+               @Override
+               public TypeInfo visit(CharType charType) {
+                       if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH) {
+                               throw new CatalogException(
+                                               String.format("HiveCatalog 
doesn't support char type with length of '%d'. " +
+                                                                       "The 
maximum length is %d",
+                                                                       
charType.getLength(), HiveChar.MAX_CHAR_LENGTH));
+                       }
+                       return 
TypeInfoFactory.getCharTypeInfo(charType.getLength());
+               }
+
+               @Override
+               public TypeInfo visit(VarCharType varCharType) {
+                       // Flink's StringType is defined as 
VARCHAR(Integer.MAX_VALUE)
+                       // We don't have more information in LogicalTypeRoot to 
distinguish StringType and a VARCHAR(Integer.MAX_VALUE) instance
+                       // Thus always treat VARCHAR(Integer.MAX_VALUE) as 
StringType
+                       if (varCharType.getLength() == Integer.MAX_VALUE) {
+                               return TypeInfoFactory.stringTypeInfo;
+                       }
+                       if (varCharType.getLength() > 
HiveVarchar.MAX_VARCHAR_LENGTH) {
+                               throw new CatalogException(
+                                               String.format("HiveCatalog 
doesn't support varchar type with length of '%d'. " +
+                                                                       "The 
maximum length is %d",
+                                                                       
varCharType.getLength(), HiveVarchar.MAX_VARCHAR_LENGTH));
+                       }
+                       return 
TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
+               }
+
+               @Override
+               public TypeInfo visit(BooleanType booleanType) {
+                       return TypeInfoFactory.booleanTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(BinaryType binaryType) {
+                       return null;
+               }
+
+               @Override
+               public TypeInfo visit(VarBinaryType varBinaryType) {
+                       // Flink's BytesType is defined as 
VARBINARY(Integer.MAX_VALUE)
+                       // We don't have more information in LogicalTypeRoot to 
distinguish BytesType and a VARBINARY(Integer.MAX_VALUE) instance
+                       // Thus always treat VARBINARY(Integer.MAX_VALUE) as 
BytesType
+                       if (varBinaryType.getLength() == 
VarBinaryType.MAX_LENGTH) {
+                               return TypeInfoFactory.binaryTypeInfo;
+                       }
+                       return null;
+               }
+
+               @Override
+               public TypeInfo visit(DecimalType decimalType) {
+                       // Flink and Hive share the same precision and scale 
range
+                       // Flink already validates the type so we don't need to 
validate again here
+                       return 
TypeInfoFactory.getDecimalTypeInfo(decimalType.getPrecision(), 
decimalType.getScale());
+               }
+
+               @Override
+               public TypeInfo visit(TinyIntType tinyIntType) {
+                       return TypeInfoFactory.byteTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(SmallIntType smallIntType) {
+                       return TypeInfoFactory.shortTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(IntType intType) {
+                       return TypeInfoFactory.intTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(BigIntType bigIntType) {
+                       return TypeInfoFactory.longTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(FloatType floatType) {
+                       return TypeInfoFactory.floatTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(DoubleType doubleType) {
+                       return TypeInfoFactory.doubleTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(DateType dateType) {
+                       return TypeInfoFactory.dateTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(TimeType timeType) {
+                       return null;
+               }
+
+               @Override
+               public TypeInfo visit(TimestampType timestampType) {
+                       return TypeInfoFactory.timestampTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(ZonedTimestampType zonedTimestampType) {
+                       return null;
+               }
+
+               @Override
+               public TypeInfo visit(LocalZonedTimestampType 
localZonedTimestampType) {
+                       return null;
+               }
+
+               @Override
+               public TypeInfo visit(YearMonthIntervalType 
yearMonthIntervalType) {
+                       return null;
+               }
+
+               @Override
+               public TypeInfo visit(DayTimeIntervalType dayTimeIntervalType) {
+                       return null;
+               }
+
+               @Override
+               public TypeInfo visit(ArrayType arrayType) {
+                       LogicalType elementType = arrayType.getElementType();
+                       TypeInfo elementTypeInfo = elementType.accept(new 
TypeInfoLogicalTypeVisitor());
+                       if (null != elementTypeInfo) {
+                               return 
TypeInfoFactory.getListTypeInfo(elementTypeInfo);
+                       } else {
+                               return null;
+                       }
+               }
+
+               @Override
+               public TypeInfo visit(MultisetType multisetType) {
+                       return null;
+               }
+
+               @Override
+               public TypeInfo visit(MapType mapType) {
+                       LogicalType keyType  = mapType.getKeyType();
+                       LogicalType valueType = mapType.getValueType();
+                       TypeInfo keyTypeInfo = keyType.accept(new 
TypeInfoLogicalTypeVisitor());
+                       TypeInfo valueTypeInfo = valueType.accept(new 
TypeInfoLogicalTypeVisitor());
+                       if (null == keyTypeInfo || null == valueTypeInfo) {
+                               return null;
+                       } else {
+                               return 
TypeInfoFactory.getMapTypeInfo(keyTypeInfo, valueTypeInfo);
+                       }
+               }
+
+               @Override
+               public TypeInfo visit(RowType rowType) {
+                       List<String> names = rowType.getFieldNames();
+                       List<TypeInfo> typeInfos = new 
ArrayList<>(names.size());
+                       for (String name : names) {
+                               TypeInfo typeInfo =
+                                               
rowType.getTypeAt(rowType.getFieldIndex(name)).accept(new 
TypeInfoLogicalTypeVisitor());
+                               if (null != typeInfo) {
+                                       typeInfos.add(typeInfo);
+                               } else {
+                                       return null;
+                               }
+                       }
+                       return TypeInfoFactory.getStructTypeInfo(names, 
typeInfos);
+               }
+
+               @Override
+               public TypeInfo visit(DistinctType distinctType) {
 
 Review comment:
   Some last suggestion: Instead of returning null a dozen of times you could 
simply use the provided `LogicalTypeDefaultVisitor` and throw the exception in 
the `defaultMethod` there.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to