This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new aa14e7e  [FLINK-13385]Align Hive data type mapping with FLIP-37
aa14e7e is described below

commit aa14e7e3f33cbebdfaeb7e00f7280255cd5fb1f9
Author: zjuwangg <[email protected]>
AuthorDate: Fri Jul 26 20:30:36 2019 +0800

    [FLINK-13385]Align Hive data type mapping with FLIP-37
    
    Align Hive data type mapping with FLIP-37.
    
    This closes #9239.
---
 docs/dev/table/catalog.md                          |  44 ++--
 .../table/catalog/hive/util/HiveTypeUtil.java      | 268 +++++++++++++--------
 .../catalog/hive/HiveCatalogDataTypeTest.java      |  25 +-
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  |   2 +-
 4 files changed, 203 insertions(+), 136 deletions(-)

diff --git a/docs/dev/table/catalog.md b/docs/dev/table/catalog.md
index 0a4dd3d..acccd83 100644
--- a/docs/dev/table/catalog.md
+++ b/docs/dev/table/catalog.md
@@ -157,27 +157,28 @@ Currently `HiveCatalog` supports most Flink data types 
with the following mappin
 
 |  Flink Data Type  |  Hive Data Type  |
 |---|---|
-| CHAR(p)       |  char(p)* |
-| VARCHAR(p)    |  varchar(p)** |
-| STRING        |  string |
-| BOOLEAN       |  boolean |
-| BYTE          |  tinyint |
-| SHORT         |  smallint |
-| INT           |  int |
-| BIGINT        |  long |
-| FLOAT         |  float |
-| DOUBLE        |  double |
-| DECIMAL(p, s) |  decimal(p, s) |
-| DATE          |  date |
-| TIMESTAMP_WITHOUT_TIME_ZONE |  Timestamp |
+| CHAR(p)       |  CHAR(p)* |
+| VARCHAR(p)    |  VARCHAR(p)** |
+| STRING        |  STRING |
+| BOOLEAN       |  BOOLEAN |
+| TINYINT       |  TINYINT |
+| SMALLINT      |  SMALLINT |
+| INT           |  INT |
+| BIGINT        |  LONG |
+| FLOAT         |  FLOAT |
+| DOUBLE        |  DOUBLE |
+| DECIMAL(p, s) |  DECIMAL(p, s) |
+| DATE          |  DATE |
+| TIMESTAMP_WITHOUT_TIME_ZONE |  TIMESTAMP |
 | TIMESTAMP_WITH_TIME_ZONE |  N/A |
 | TIMESTAMP_WITH_LOCAL_TIME_ZONE |  N/A |
-| INTERVAL |  N/A |
-| BINARY        |  binary |
-| VARBINARY(p)  |  binary |
-| ARRAY\<E>     |  list\<E> |
-| MAP<K, V>     |  map<K, V> |
-| ROW           |  struct |
+| INTERVAL      |   N/A*** |
+| BINARY        |   N/A |
+| VARBINARY(p)  |   N/A |
+| BYTES         |   BINARY |
+| ARRAY\<E>     |  ARRAY\<E> |
+| MAP<K, V>     |  MAP<K, V> ****|
+| ROW           |  STRUCT |
 | MULTISET      |  N/A |
 
 
@@ -189,11 +190,14 @@ The following limitations in Hive's data types impact the 
mapping between Flink
 
 \** maximum length is 65535
 
+\*** `INTERVAL` type can not be mapped to hive `INTERVAL` for now.
+
+\**** Hive map key type only allows primitive types, while Flink map key can 
be any data type.
+
 ## User-configured Catalog
 
 Catalogs are pluggable. Users can develop custom catalogs by implementing the 
`Catalog` interface, which defines a set of APIs for reading and writing 
catalog meta-objects such as database, tables, partitions, views, and functions.
 
-
 Catalog Registration
 --------------------
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
index b9f8a57..68c3ede 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
@@ -21,16 +21,25 @@ package org.apache.flink.table.catalog.hive.util;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.types.AtomicDataType;
-import org.apache.flink.table.types.CollectionDataType;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
 import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
 import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
@@ -48,7 +57,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -81,107 +89,8 @@ public class HiveTypeUtil {
         */
        public static TypeInfo toHiveTypeInfo(DataType dataType) {
                checkNotNull(dataType, "type cannot be null");
-
-               LogicalTypeRoot type = dataType.getLogicalType().getTypeRoot();
-
-               if (dataType instanceof AtomicDataType) {
-                       if (type.equals(LogicalTypeRoot.BOOLEAN)) {
-                               return TypeInfoFactory.booleanTypeInfo;
-                       } else if (type.equals(LogicalTypeRoot.TINYINT)) {
-                               return TypeInfoFactory.byteTypeInfo;
-                       } else if (type.equals(LogicalTypeRoot.SMALLINT)) {
-                               return TypeInfoFactory.shortTypeInfo;
-                       } else if (type.equals(LogicalTypeRoot.INTEGER)) {
-                               return TypeInfoFactory.intTypeInfo;
-                       } else if (type.equals(LogicalTypeRoot.BIGINT)) {
-                               return TypeInfoFactory.longTypeInfo;
-                       } else if (type.equals(LogicalTypeRoot.FLOAT)) {
-                               return TypeInfoFactory.floatTypeInfo;
-                       } else if (type.equals(LogicalTypeRoot.DOUBLE)) {
-                               return TypeInfoFactory.doubleTypeInfo;
-                       } else if (type.equals(LogicalTypeRoot.DATE)) {
-                               return TypeInfoFactory.dateTypeInfo;
-                       } else if 
(type.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
-                               return TypeInfoFactory.timestampTypeInfo;
-                       } else if (type.equals(LogicalTypeRoot.BINARY) || 
type.equals(LogicalTypeRoot.VARBINARY)) {
-                               // Hive doesn't support variable-length binary 
string
-                               return TypeInfoFactory.binaryTypeInfo;
-                       } else if (type.equals(LogicalTypeRoot.CHAR)) {
-                               CharType charType = (CharType) 
dataType.getLogicalType();
-
-                               if (charType.getLength() > 
HiveChar.MAX_CHAR_LENGTH) {
-                                       throw new CatalogException(
-                                               String.format("HiveCatalog 
doesn't support char type with length of '%d'. " +
-                                                               "The maximum 
length is %d",
-                                                       charType.getLength(), 
HiveChar.MAX_CHAR_LENGTH));
-                               }
-
-                               return 
TypeInfoFactory.getCharTypeInfo(charType.getLength());
-                       } else if (type.equals(LogicalTypeRoot.VARCHAR)) {
-                               VarCharType varCharType = (VarCharType) 
dataType.getLogicalType();
-
-                               // Flink's StringType is defined as 
VARCHAR(Integer.MAX_VALUE)
-                               // We don't have more information in 
LogicalTypeRoot to distringuish StringType and a VARCHAR(Integer.MAX_VALUE) 
instance
-                               // Thus always treat VARCHAR(Integer.MAX_VALUE) 
as StringType
-                               if (varCharType.getLength() == 
Integer.MAX_VALUE) {
-                                       return TypeInfoFactory.stringTypeInfo;
-                               }
-
-                               if (varCharType.getLength() > 
HiveVarchar.MAX_VARCHAR_LENGTH) {
-                                       throw new CatalogException(
-                                               String.format("HiveCatalog 
doesn't support varchar type with length of '%d'. " +
-                                                               "The maximum 
length is %d",
-                                                       
varCharType.getLength(), HiveVarchar.MAX_VARCHAR_LENGTH));
-                               }
-
-                               return 
TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
-                       } else if (type.equals(LogicalTypeRoot.DECIMAL)) {
-                               DecimalType decimalType = (DecimalType) 
dataType.getLogicalType();
-
-                               // Flink and Hive share the same precision and 
scale range
-                               // Flink already validates the type so we don't 
need to validate again here
-                               return 
TypeInfoFactory.getDecimalTypeInfo(decimalType.getPrecision(), 
decimalType.getScale());
-                       }
-
-                       // Flink's primitive types that Hive 2.3.4 doesn't 
support: Time, TIMESTAMP_WITH_LOCAL_TIME_ZONE
-               }
-
-               if (dataType instanceof CollectionDataType) {
-
-                       if (type.equals(LogicalTypeRoot.ARRAY)) {
-                               DataType elementType = ((CollectionDataType) 
dataType).getElementDataType();
-
-                               return 
TypeInfoFactory.getListTypeInfo(toHiveTypeInfo(elementType));
-                       }
-
-                       // Flink's collection types that Hive 2.3.4 doesn't 
support: multiset
-               }
-
-               if (dataType instanceof KeyValueDataType) {
-                       KeyValueDataType keyValueDataType = (KeyValueDataType) 
dataType;
-                       DataType keyType = keyValueDataType.getKeyDataType();
-                       DataType valueType = 
keyValueDataType.getValueDataType();
-
-                       return 
TypeInfoFactory.getMapTypeInfo(toHiveTypeInfo(keyType), 
toHiveTypeInfo(valueType));
-               }
-
-               if (dataType instanceof FieldsDataType) {
-                       FieldsDataType fieldsDataType = (FieldsDataType) 
dataType;
-                       // need to retrieve field names in order
-                       List<String> names = ((RowType) 
fieldsDataType.getLogicalType()).getFieldNames();
-
-                       Map<String, DataType> nameToType = 
fieldsDataType.getFieldDataTypes();
-                       List<TypeInfo> typeInfos = new 
ArrayList<>(names.size());
-
-                       for (String name : names) {
-                               
typeInfos.add(toHiveTypeInfo(nameToType.get(name)));
-                       }
-
-                       return TypeInfoFactory.getStructTypeInfo(names, 
typeInfos);
-               }
-
-               throw new UnsupportedOperationException(
-                       String.format("Flink doesn't support converting type %s 
to Hive type yet.", dataType.toString()));
+               LogicalType logicalType = dataType.getLogicalType();
+               return logicalType.accept(new 
TypeInfoLogicalTypeVisitor(dataType));
        }
 
        /**
@@ -269,4 +178,149 @@ public class HiveTypeUtil {
                                        String.format("Flink doesn't support 
Hive primitive type %s yet", hiveType));
                }
        }
+
+       private static class TypeInfoLogicalTypeVisitor extends 
LogicalTypeDefaultVisitor<TypeInfo> {
+               private final DataType dataType;
+
+               public TypeInfoLogicalTypeVisitor(DataType dataType) {
+                       this.dataType = dataType;
+               }
+
+               @Override
+               public TypeInfo visit(CharType charType) {
+                       if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH) {
+                               throw new CatalogException(
+                                               String.format("HiveCatalog 
doesn't support char type with length of '%d'. " +
+                                                                       "The 
maximum length is %d",
+                                                                       
charType.getLength(), HiveChar.MAX_CHAR_LENGTH));
+                       }
+                       return 
TypeInfoFactory.getCharTypeInfo(charType.getLength());
+               }
+
+               @Override
+               public TypeInfo visit(VarCharType varCharType) {
+                       // Flink's StringType is defined as 
VARCHAR(Integer.MAX_VALUE)
+                       // We don't have more information in LogicalTypeRoot to 
distinguish StringType and a VARCHAR(Integer.MAX_VALUE) instance
+                       // Thus always treat VARCHAR(Integer.MAX_VALUE) as 
StringType
+                       if (varCharType.getLength() == Integer.MAX_VALUE) {
+                               return TypeInfoFactory.stringTypeInfo;
+                       }
+                       if (varCharType.getLength() > 
HiveVarchar.MAX_VARCHAR_LENGTH) {
+                               throw new CatalogException(
+                                               String.format("HiveCatalog 
doesn't support varchar type with length of '%d'. " +
+                                                                       "The 
maximum length is %d",
+                                                                       
varCharType.getLength(), HiveVarchar.MAX_VARCHAR_LENGTH));
+                       }
+                       return 
TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
+               }
+
+               @Override
+               public TypeInfo visit(BooleanType booleanType) {
+                       return TypeInfoFactory.booleanTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(VarBinaryType varBinaryType) {
+                       // Flink's BytesType is defined as 
VARBINARY(Integer.MAX_VALUE)
+                       // We don't have more information in LogicalTypeRoot to 
distinguish BytesType and a VARBINARY(Integer.MAX_VALUE) instance
+                       // Thus always treat VARBINARY(Integer.MAX_VALUE) as 
BytesType
+                       if (varBinaryType.getLength() == 
VarBinaryType.MAX_LENGTH) {
+                               return TypeInfoFactory.binaryTypeInfo;
+                       }
+                       return defaultMethod(varBinaryType);
+               }
+
+               @Override
+               public TypeInfo visit(DecimalType decimalType) {
+                       // Flink and Hive share the same precision and scale 
range
+                       // Flink already validates the type so we don't need to 
validate again here
+                       return 
TypeInfoFactory.getDecimalTypeInfo(decimalType.getPrecision(), 
decimalType.getScale());
+               }
+
+               @Override
+               public TypeInfo visit(TinyIntType tinyIntType) {
+                       return TypeInfoFactory.byteTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(SmallIntType smallIntType) {
+                       return TypeInfoFactory.shortTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(IntType intType) {
+                       return TypeInfoFactory.intTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(BigIntType bigIntType) {
+                       return TypeInfoFactory.longTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(FloatType floatType) {
+                       return TypeInfoFactory.floatTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(DoubleType doubleType) {
+                       return TypeInfoFactory.doubleTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(DateType dateType) {
+                       return TypeInfoFactory.dateTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(TimestampType timestampType) {
+                       return TypeInfoFactory.timestampTypeInfo;
+               }
+
+               @Override
+               public TypeInfo visit(ArrayType arrayType) {
+                       LogicalType elementType = arrayType.getElementType();
+                       TypeInfo elementTypeInfo = elementType.accept(new 
TypeInfoLogicalTypeVisitor(dataType));
+                       if (null != elementTypeInfo) {
+                               return 
TypeInfoFactory.getListTypeInfo(elementTypeInfo);
+                       } else {
+                               return defaultMethod(arrayType);
+                       }
+               }
+
+               @Override
+               public TypeInfo visit(MapType mapType) {
+                       LogicalType keyType  = mapType.getKeyType();
+                       LogicalType valueType = mapType.getValueType();
+                       TypeInfo keyTypeInfo = keyType.accept(new 
TypeInfoLogicalTypeVisitor(dataType));
+                       TypeInfo valueTypeInfo = valueType.accept(new 
TypeInfoLogicalTypeVisitor(dataType));
+                       if (null == keyTypeInfo || null == valueTypeInfo) {
+                               return defaultMethod(mapType);
+                       } else {
+                               return 
TypeInfoFactory.getMapTypeInfo(keyTypeInfo, valueTypeInfo);
+                       }
+               }
+
+               @Override
+               public TypeInfo visit(RowType rowType) {
+                       List<String> names = rowType.getFieldNames();
+                       List<TypeInfo> typeInfos = new 
ArrayList<>(names.size());
+                       for (String name : names) {
+                               TypeInfo typeInfo =
+                                               
rowType.getTypeAt(rowType.getFieldIndex(name)).accept(new 
TypeInfoLogicalTypeVisitor(dataType));
+                               if (null != typeInfo) {
+                                       typeInfos.add(typeInfo);
+                               } else {
+                                       return defaultMethod(rowType);
+                               }
+                       }
+                       return TypeInfoFactory.getStructTypeInfo(names, 
typeInfos);
+               }
+
+               @Override
+               protected TypeInfo defaultMethod(LogicalType logicalType) {
+                       throw new UnsupportedOperationException(
+                                       String.format("Flink doesn't support 
converting type %s to Hive type yet.", dataType.toString()));
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
index e9d40fe..b14f1fb 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogDataTypeTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.BinaryType;
-import org.apache.flink.table.types.logical.VarBinaryType;
 
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
@@ -40,7 +39,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.util.Arrays;
 import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
@@ -122,20 +120,31 @@ public class HiveCatalogDataTypeTest {
        }
 
        @Test
-       public void testNonExactlyMatchedDataTypes() throws Exception {
+       public void testNonSupportedBinaryDataTypes() throws Exception {
                DataType[] types = new DataType[] {
-                       DataTypes.BINARY(BinaryType.MAX_LENGTH),
-                       DataTypes.VARBINARY(VarBinaryType.MAX_LENGTH)
+                               DataTypes.BINARY(BinaryType.MAX_LENGTH)
                };
 
                CatalogTable table = createCatalogTable(types);
 
                catalog.createDatabase(db1, createDb(), false);
+
+               exception.expect(UnsupportedOperationException.class);
                catalog.createTable(path1, table, false);
+       }
+
+       @Test
+       public void testNonSupportedVarBinaryDataTypes() throws Exception {
+               DataType[] types = new DataType[] {
+                               DataTypes.VARBINARY(20)
+               };
 
-               Arrays.equals(
-                       new DataType[] {DataTypes.BYTES(), DataTypes.BYTES()},
-                       
catalog.getTable(path1).getSchema().getFieldDataTypes());
+               CatalogTable table = createCatalogTable(types);
+
+               catalog.createDatabase(db1, createDb(), false);
+
+               exception.expect(UnsupportedOperationException.class);
+               catalog.createTable(path1, table, false);
        }
 
        @Test
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 95e09b4..0fe10d7 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -87,7 +87,7 @@ public class HiveCatalogHiveMetadataTest extends 
HiveCatalogTestBase {
                                                                                
        .field("fourth", DataTypes.DATE())
                                                                                
        .field("fifth", DataTypes.DOUBLE())
                                                                                
        .field("sixth", DataTypes.BIGINT())
-                                                                               
        .field("seventh", DataTypes.VARBINARY(200))
+                                                                               
        .field("seventh", DataTypes.BYTES())
                                                                                
        .build();
                CatalogTable catalogTable = new CatalogTableImpl(tableSchema, 
getBatchTableProperties(), TEST_COMMENT);
                catalog.createTable(path1, catalogTable, false);

Reply via email to