xuefuz commented on a change in pull request #9239: [FLINK-13385]Align Hive
data type mapping with FLIP-37
URL: https://github.com/apache/flink/pull/9239#discussion_r308349596
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
##########
@@ -85,65 +86,68 @@ public static TypeInfo toHiveTypeInfo(DataType dataType) {
LogicalTypeRoot type = dataType.getLogicalType().getTypeRoot();
if (dataType instanceof AtomicDataType) {
- if (type.equals(LogicalTypeRoot.BOOLEAN)) {
- return TypeInfoFactory.booleanTypeInfo;
- } else if (type.equals(LogicalTypeRoot.TINYINT)) {
- return TypeInfoFactory.byteTypeInfo;
- } else if (type.equals(LogicalTypeRoot.SMALLINT)) {
- return TypeInfoFactory.shortTypeInfo;
- } else if (type.equals(LogicalTypeRoot.INTEGER)) {
- return TypeInfoFactory.intTypeInfo;
- } else if (type.equals(LogicalTypeRoot.BIGINT)) {
- return TypeInfoFactory.longTypeInfo;
- } else if (type.equals(LogicalTypeRoot.FLOAT)) {
- return TypeInfoFactory.floatTypeInfo;
- } else if (type.equals(LogicalTypeRoot.DOUBLE)) {
- return TypeInfoFactory.doubleTypeInfo;
- } else if (type.equals(LogicalTypeRoot.DATE)) {
- return TypeInfoFactory.dateTypeInfo;
- } else if
(type.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
- return TypeInfoFactory.timestampTypeInfo;
- } else if (type.equals(LogicalTypeRoot.BINARY) ||
type.equals(LogicalTypeRoot.VARBINARY)) {
- // Hive doesn't support variable-length binary
string
- return TypeInfoFactory.binaryTypeInfo;
- } else if (type.equals(LogicalTypeRoot.CHAR)) {
- CharType charType = (CharType)
dataType.getLogicalType();
-
- if (charType.getLength() >
HiveChar.MAX_CHAR_LENGTH) {
- throw new CatalogException(
- String.format("HiveCatalog
doesn't support char type with length of '%d'. " +
- "The maximum
length is %d",
- charType.getLength(),
HiveChar.MAX_CHAR_LENGTH));
+ switch (type) {
+ case BOOLEAN:
+ return TypeInfoFactory.booleanTypeInfo;
+ case TINYINT:
+ return TypeInfoFactory.byteTypeInfo;
+ case SMALLINT:
+ return TypeInfoFactory.shortTypeInfo;
+ case INTEGER:
+ return TypeInfoFactory.intTypeInfo;
+ case BIGINT:
+ return TypeInfoFactory.longTypeInfo;
+ case FLOAT:
+ return TypeInfoFactory.floatTypeInfo;
+ case DOUBLE:
+ return TypeInfoFactory.doubleTypeInfo;
+ case DATE:
+ return TypeInfoFactory.dateTypeInfo;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return
TypeInfoFactory.timestampTypeInfo;
+ case CHAR: {
+ CharType charType = (CharType)
dataType.getLogicalType();
+ if (charType.getLength() >
HiveChar.MAX_CHAR_LENGTH) {
+ throw new CatalogException(
+
String.format("HiveCatalog doesn't support char type with length of '%d'. " +
+
"The maximum length is %d",
+
charType.getLength(), HiveChar.MAX_CHAR_LENGTH));
+ }
+ return
TypeInfoFactory.getCharTypeInfo(charType.getLength());
}
-
- return
TypeInfoFactory.getCharTypeInfo(charType.getLength());
- } else if (type.equals(LogicalTypeRoot.VARCHAR)) {
- VarCharType varCharType = (VarCharType)
dataType.getLogicalType();
-
- // Flink's StringType is defined as
VARCHAR(Integer.MAX_VALUE)
- // We don't have more information in
LogicalTypeRoot to distringuish StringType and a VARCHAR(Integer.MAX_VALUE)
instance
- // Thus always treat VARCHAR(Integer.MAX_VALUE)
as StringType
- if (varCharType.getLength() ==
Integer.MAX_VALUE) {
- return TypeInfoFactory.stringTypeInfo;
+ case VARCHAR: {
+ VarCharType varCharType = (VarCharType)
dataType.getLogicalType();
+ // Flink's StringType is defined as
VARCHAR(Integer.MAX_VALUE)
+ // We don't have more information in
LogicalTypeRoot to distringuish StringType and a VARCHAR(Integer.MAX_VALUE)
instance
+ // Thus always treat
VARCHAR(Integer.MAX_VALUE) as StringType
+ if (varCharType.getLength() ==
Integer.MAX_VALUE) {
+ return
TypeInfoFactory.stringTypeInfo;
+ }
+ if (varCharType.getLength() >
HiveVarchar.MAX_VARCHAR_LENGTH) {
+ throw new CatalogException(
+
String.format("HiveCatalog doesn't support varchar type with length of '%d'. " +
+
"The maximum length is %d",
+
varCharType.getLength(), HiveVarchar.MAX_VARCHAR_LENGTH));
+ }
+ return
TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
}
-
- if (varCharType.getLength() >
HiveVarchar.MAX_VARCHAR_LENGTH) {
- throw new CatalogException(
- String.format("HiveCatalog
doesn't support varchar type with length of '%d'. " +
- "The maximum
length is %d",
-
varCharType.getLength(), HiveVarchar.MAX_VARCHAR_LENGTH));
+ case DECIMAL: {
+ DecimalType decimalType = (DecimalType)
dataType.getLogicalType();
+ // Flink and Hive share the same
precision and scale range
+ // Flink already validates the type so
we don't need to validate again here
+ return
TypeInfoFactory.getDecimalTypeInfo(decimalType.getPrecision(),
decimalType.getScale());
}
-
- return
TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
- } else if (type.equals(LogicalTypeRoot.DECIMAL)) {
- DecimalType decimalType = (DecimalType)
dataType.getLogicalType();
-
- // Flink and Hive share the same precision and
scale range
- // Flink already validates the type so we don't
need to validate again here
- return
TypeInfoFactory.getDecimalTypeInfo(decimalType.getPrecision(),
decimalType.getScale());
+ case VARBINARY: {
+ VarBinaryType varBinaryType =
(VarBinaryType) dataType.getLogicalType();
+ if (varBinaryType.getLength() ==
VarBinaryType.MAX_LENGTH) {
Review comment:
Nit: maybe we can add a comment saying the type is actually DataTypes.BYTES,
similar to what is done for VARCHAR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services