twalthr commented on a change in pull request #9239: [FLINK-13385]Align Hive
data type mapping with FLIP-37
URL: https://github.com/apache/flink/pull/9239#discussion_r310006617
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
##########
@@ -83,112 +102,15 @@ public static String toHiveTypeName(DataType type) {
public static TypeInfo toHiveTypeInfo(DataType dataType) {
checkNotNull(dataType, "type cannot be null");
- LogicalTypeRoot type = dataType.getLogicalType().getTypeRoot();
-
- if (dataType instanceof AtomicDataType) {
- switch (type) {
- case BOOLEAN:
- return TypeInfoFactory.booleanTypeInfo;
- case TINYINT:
- return TypeInfoFactory.byteTypeInfo;
- case SMALLINT:
- return TypeInfoFactory.shortTypeInfo;
- case INTEGER:
- return TypeInfoFactory.intTypeInfo;
- case BIGINT:
- return TypeInfoFactory.longTypeInfo;
- case FLOAT:
- return TypeInfoFactory.floatTypeInfo;
- case DOUBLE:
- return TypeInfoFactory.doubleTypeInfo;
- case DATE:
- return TypeInfoFactory.dateTypeInfo;
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- return
TypeInfoFactory.timestampTypeInfo;
- case CHAR: {
- CharType charType = (CharType)
dataType.getLogicalType();
- if (charType.getLength() >
HiveChar.MAX_CHAR_LENGTH) {
- throw new CatalogException(
-
String.format("HiveCatalog doesn't support char type with length of '%d'. " +
-
"The maximum length is %d",
-
charType.getLength(), HiveChar.MAX_CHAR_LENGTH));
- }
- return
TypeInfoFactory.getCharTypeInfo(charType.getLength());
- }
- case VARCHAR: {
- VarCharType varCharType = (VarCharType)
dataType.getLogicalType();
- // Flink's StringType is defined as
VARCHAR(Integer.MAX_VALUE)
- // We don't have more information in
LogicalTypeRoot to distinguish StringType and a VARCHAR(Integer.MAX_VALUE)
instance
- // Thus always treat
VARCHAR(Integer.MAX_VALUE) as StringType
- if (varCharType.getLength() ==
Integer.MAX_VALUE) {
- return
TypeInfoFactory.stringTypeInfo;
- }
- if (varCharType.getLength() >
HiveVarchar.MAX_VARCHAR_LENGTH) {
- throw new CatalogException(
-
String.format("HiveCatalog doesn't support varchar type with length of '%d'. " +
-
"The maximum length is %d",
-
varCharType.getLength(), HiveVarchar.MAX_VARCHAR_LENGTH));
- }
- return
TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
- }
- case DECIMAL: {
- DecimalType decimalType = (DecimalType)
dataType.getLogicalType();
- // Flink and Hive share the same
precision and scale range
- // Flink already validates the type so
we don't need to validate again here
- return
TypeInfoFactory.getDecimalTypeInfo(decimalType.getPrecision(),
decimalType.getScale());
- }
- case VARBINARY: {
- // Flink's BytesType is defined as
VARBINARY(Integer.MAX_VALUE)
- // We don't have more information in
LogicalTypeRoot to distinguish BytesType and a VARBINARY(Integer.MAX_VALUE)
instance
- // Thus always treat
VARBINARY(Integer.MAX_VALUE) as BytesType
- VarBinaryType varBinaryType =
(VarBinaryType) dataType.getLogicalType();
- if (varBinaryType.getLength() ==
VarBinaryType.MAX_LENGTH) {
- return
TypeInfoFactory.binaryTypeInfo;
- }
- break;
- }
- // Flink's primitive types that Hive 2.3.4
doesn't support: Time, TIMESTAMP_WITH_LOCAL_TIME_ZONE
- default:
- break;
- }
- }
-
- if (dataType instanceof CollectionDataType) {
-
- if (type.equals(LogicalTypeRoot.ARRAY)) {
- DataType elementType = ((CollectionDataType)
dataType).getElementDataType();
-
- return
TypeInfoFactory.getListTypeInfo(toHiveTypeInfo(elementType));
- }
-
- // Flink's collection types that Hive 2.3.4 doesn't
support: multiset
- }
-
- if (dataType instanceof KeyValueDataType) {
- KeyValueDataType keyValueDataType = (KeyValueDataType)
dataType;
- DataType keyType = keyValueDataType.getKeyDataType();
- DataType valueType =
keyValueDataType.getValueDataType();
-
- return
TypeInfoFactory.getMapTypeInfo(toHiveTypeInfo(keyType),
toHiveTypeInfo(valueType));
- }
-
- if (dataType instanceof FieldsDataType) {
- FieldsDataType fieldsDataType = (FieldsDataType)
dataType;
- // need to retrieve field names in order
- List<String> names = ((RowType)
fieldsDataType.getLogicalType()).getFieldNames();
-
- Map<String, DataType> nameToType =
fieldsDataType.getFieldDataTypes();
- List<TypeInfo> typeInfos = new
ArrayList<>(names.size());
-
- for (String name : names) {
-
typeInfos.add(toHiveTypeInfo(nameToType.get(name)));
- }
-
- return TypeInfoFactory.getStructTypeInfo(names,
typeInfos);
+// LogicalTypeRoot type = dataType.getLogicalType().getTypeRoot();
Review comment:
remove this line?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services