This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c64e850  [FLINK-12973]Support reading Hive complex types like array 
and map
c64e850 is described below

commit c64e8503c9f093c48d119e1dce9f64eea9843069
Author: zjuwangg <zjuwa...@foxmail.com>
AuthorDate: Mon Jul 1 18:18:53 2019 +0800

    [FLINK-12973]Support reading Hive complex types like array and map
    
    This PR adds support reading Hive complex types like array and map.
    
    This closes #8935.
---
 .../batch/connectors/hive/HiveRecordSerDe.java     | 93 ----------------------
 .../connectors/hive/HiveTableInputFormat.java      |  5 +-
 .../functions/hive/conversion/HiveInspectors.java  |  5 +-
 .../batch/connectors/hive/HiveInputFormatTest.java | 53 ++++++++++++
 .../src/test/resources/complex_test/1.txt          |  1 +
 5 files changed, 59 insertions(+), 98 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveRecordSerDe.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveRecordSerDe.java
deleted file mode 100644
index be046d9..0000000
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveRecordSerDe.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.hive;
-
-import org.apache.hadoop.hive.common.type.HiveChar;
-import org.apache.hadoop.hive.common.type.HiveVarchar;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
-
-/**
- * Class used to serialize to and from raw hdfs file type.
- * Highly inspired by HCatRecordSerDe (almost copied from this class)in 
hive-catalog-core.
- */
-public class HiveRecordSerDe {
-
-       /**
-        * Return underlying Java Object from an object-representation
-        * that is readable by a provided ObjectInspector.
-        */
-       public static Object obtainFlinkRowField(Object field, ObjectInspector 
fieldObjectInspector) {
-               Object res;
-               if (fieldObjectInspector.getCategory() == 
ObjectInspector.Category.PRIMITIVE) {
-                       res = convertPrimitiveField(field, 
(PrimitiveObjectInspector) fieldObjectInspector);
-               } else {
-                       throw new FlinkHiveException(new SerDeException(
-                                       String.format("HiveRecordSerDe doesn't 
support category %s, type %s yet",
-                                       fieldObjectInspector.getCategory(), 
fieldObjectInspector.getTypeName())));
-               }
-               return res;
-       }
-
-       /**
-        * This method actually convert java objects of Hive's scalar data 
types to those of Flink's internal data types.
-        *
-        * @param field field value
-        * @param primitiveObjectInspector Hive's primitive object inspector 
for the field
-        * @return the java object conforming to Flink's internal data types.
-        *
-        * TODO: Comparing to original HCatRecordSerDe.java, we may need add 
more type converter according to conf.
-        */
-       private static Object convertPrimitiveField(Object field, 
PrimitiveObjectInspector primitiveObjectInspector) {
-               if (field == null) {
-                       return null;
-               }
-
-               switch(primitiveObjectInspector.getPrimitiveCategory()) {
-                       case DECIMAL:
-                               HiveDecimalObjectInspector decimalOI = 
(HiveDecimalObjectInspector) primitiveObjectInspector;
-                               BigDecimal bigDecimal = 
decimalOI.getPrimitiveJavaObject(field).bigDecimalValue();
-                               return bigDecimal;
-                       case TIMESTAMP:
-                               Timestamp ts = ((TimestampObjectInspector) 
primitiveObjectInspector).getPrimitiveJavaObject(field);
-                               return ts;
-                       case DATE:
-                               Date date = ((DateObjectInspector) 
primitiveObjectInspector).getPrimitiveWritableObject(field).get();
-                               return date;
-                       case CHAR:
-                               HiveChar c = ((HiveCharObjectInspector) 
primitiveObjectInspector).getPrimitiveJavaObject(field);
-                               return c.getStrippedValue();
-                       case VARCHAR:
-                               HiveVarchar vc = ((HiveVarcharObjectInspector) 
primitiveObjectInspector).getPrimitiveJavaObject(field);
-                               return vc.getValue();
-                       default:
-                               return 
primitiveObjectInspector.getPrimitiveJavaObject(field);
-               }
-       }
-}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
index be6f92a..755e62c 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.conf.Configurable;
@@ -218,8 +219,8 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<Row, HiveT
                        int index = 0;
                        for (; index < structFields.size(); index++) {
                                StructField structField = 
structFields.get(index);
-                               Object object = 
HiveRecordSerDe.obtainFlinkRowField(
-                                               
structObjectInspector.getStructFieldData(hiveRowStruct, structField), 
structField.getFieldObjectInspector());
+                               Object object = 
HiveInspectors.toFlinkObject(structField.getFieldObjectInspector(),
+                                               
structObjectInspector.getStructFieldData(hiveRowStruct, structField));
                                row.setField(index, object);
                        }
                        for (String partition : partitionColNames){
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 85ca033..2ef5ee0 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -49,7 +49,6 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
@@ -332,8 +331,8 @@ public class HiveInspectors {
                        return result;
                }
 
-               if (inspector instanceof StandardStructObjectInspector) {
-                       StandardStructObjectInspector structInspector = 
(StandardStructObjectInspector) inspector;
+               if (inspector instanceof StructObjectInspector) {
+                       StructObjectInspector structInspector = 
(StructObjectInspector) inspector;
 
                        List<? extends StructField> fields = 
structInspector.getAllStructFieldRefs();
 
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
index 5560872..c0f284c 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -31,6 +32,7 @@ import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -124,4 +126,55 @@ public class HiveInputFormatTest {
                Assert.assertEquals("3,3,a,3000,3.33", rows.get(2).toString());
                Assert.assertEquals("4,4,a,4000,4.44", rows.get(3).toString());
        }
+
+       @Test
+       public void testReadComplextDataTypeFromHiveInputFormat() throws 
Exception {
+               final String dbName = "default";
+               final String tblName = "complext_test";
+
+               TableSchema.Builder builder = new TableSchema.Builder();
+               builder.fields(new String[]{"a", "m", "s"}, new DataType[]{
+                               DataTypes.ARRAY(DataTypes.INT()),
+                               DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()),
+                               DataTypes.ROW(DataTypes.FIELD("f1", 
DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING()))});
+
+               //Now we used metaStore client to create hive table instead of 
using hiveCatalog for it doesn't support set
+               //serDe temporarily.
+               HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(hiveConf, null);
+               org.apache.hadoop.hive.metastore.api.Table tbl = new 
org.apache.hadoop.hive.metastore.api.Table();
+               tbl.setDbName(dbName);
+               tbl.setTableName(tblName);
+               tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
+               tbl.setParameters(new HashMap<>());
+               StorageDescriptor sd = new StorageDescriptor();
+               String location = 
HiveInputFormatTest.class.getResource("/complex_test").getPath();
+               sd.setLocation(location);
+               
sd.setInputFormat(DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS);
+               sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
+               sd.setSerdeInfo(new SerDeInfo());
+               
sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS);
+               sd.getSerdeInfo().setParameters(new HashMap<>());
+               sd.getSerdeInfo().getParameters().put("serialization.format", 
"1");
+               sd.getSerdeInfo().getParameters().put("field.delim", ";");
+               //org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe use 
'colelction.delim' as a delimiter config key
+               // it may be a typo of this class
+               sd.getSerdeInfo().getParameters().put("colelction.delim", ",");
+               sd.getSerdeInfo().getParameters().put("mapkey.delim", ":");
+               sd.setCols(HiveTableUtil.createHiveColumns(builder.build()));
+               tbl.setSd(sd);
+               tbl.setPartitionKeys(new ArrayList<>());
+
+               client.createTable(tbl);
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               RowTypeInfo rowTypeInfo = new 
RowTypeInfo(builder.build().getFieldTypes(), builder.build().getFieldNames());
+               List<HiveTablePartition> partitions = new ArrayList<>();
+               partitions.add(new HiveTablePartition(sd, new HashMap<>()));
+               HiveTableInputFormat hiveTableInputFormat = new 
HiveTableInputFormat(new JobConf(hiveConf), false, null,
+                                                                               
                                                                        
partitions, rowTypeInfo);
+               DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat);
+               List<Row> rows = rowDataSet.collect();
+               Assert.assertEquals(1, rows.size());
+               Assert.assertEquals("[1, 2, 3],{1=a, 2=b},3,c", 
rows.get(0).toString());
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/resources/complex_test/1.txt 
b/flink-connectors/flink-connector-hive/src/test/resources/complex_test/1.txt
new file mode 100644
index 0000000..bcb199b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/resources/complex_test/1.txt
@@ -0,0 +1 @@
+1,2,3;1:a,2:b;3,c
\ No newline at end of file

Reply via email to