This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c64e850 [FLINK-12973]Support reading Hive complex types like array and map c64e850 is described below commit c64e8503c9f093c48d119e1dce9f64eea9843069 Author: zjuwangg <zjuwa...@foxmail.com> AuthorDate: Mon Jul 1 18:18:53 2019 +0800 [FLINK-12973]Support reading Hive complex types like array and map This PR adds support reading Hive complex types like array and map. This closes #8935. --- .../batch/connectors/hive/HiveRecordSerDe.java | 93 ---------------------- .../connectors/hive/HiveTableInputFormat.java | 5 +- .../functions/hive/conversion/HiveInspectors.java | 5 +- .../batch/connectors/hive/HiveInputFormatTest.java | 53 ++++++++++++ .../src/test/resources/complex_test/1.txt | 1 + 5 files changed, 59 insertions(+), 98 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveRecordSerDe.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveRecordSerDe.java deleted file mode 100644 index be046d9..0000000 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveRecordSerDe.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.batch.connectors.hive; - -import org.apache.hadoop.hive.common.type.HiveChar; -import org.apache.hadoop.hive.common.type.HiveVarchar; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; - -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Timestamp; - -/** - * Class used to serialize to and from raw hdfs file type. - * Highly inspired by HCatRecordSerDe (almost copied from this class)in hive-catalog-core. - */ -public class HiveRecordSerDe { - - /** - * Return underlying Java Object from an object-representation - * that is readable by a provided ObjectInspector. - */ - public static Object obtainFlinkRowField(Object field, ObjectInspector fieldObjectInspector) { - Object res; - if (fieldObjectInspector.getCategory() == ObjectInspector.Category.PRIMITIVE) { - res = convertPrimitiveField(field, (PrimitiveObjectInspector) fieldObjectInspector); - } else { - throw new FlinkHiveException(new SerDeException( - String.format("HiveRecordSerDe doesn't support category %s, type %s yet", - fieldObjectInspector.getCategory(), fieldObjectInspector.getTypeName()))); - } - return res; - } - - /** - * This method actually convert java objects of Hive's scalar data types to those of Flink's internal data types. - * - * @param field field value - * @param primitiveObjectInspector Hive's primitive object inspector for the field - * @return the java object conforming to Flink's internal data types. - * - * TODO: Comparing to original HCatRecordSerDe.java, we may need add more type converter according to conf. - */ - private static Object convertPrimitiveField(Object field, PrimitiveObjectInspector primitiveObjectInspector) { - if (field == null) { - return null; - } - - switch(primitiveObjectInspector.getPrimitiveCategory()) { - case DECIMAL: - HiveDecimalObjectInspector decimalOI = (HiveDecimalObjectInspector) primitiveObjectInspector; - BigDecimal bigDecimal = decimalOI.getPrimitiveJavaObject(field).bigDecimalValue(); - return bigDecimal; - case TIMESTAMP: - Timestamp ts = ((TimestampObjectInspector) primitiveObjectInspector).getPrimitiveJavaObject(field); - return ts; - case DATE: - Date date = ((DateObjectInspector) primitiveObjectInspector).getPrimitiveWritableObject(field).get(); - return date; - case CHAR: - HiveChar c = ((HiveCharObjectInspector) primitiveObjectInspector).getPrimitiveJavaObject(field); - return c.getStrippedValue(); - case VARCHAR: - HiveVarchar vc = ((HiveVarcharObjectInspector) primitiveObjectInspector).getPrimitiveJavaObject(field); - return vc.getValue(); - default: - return primitiveObjectInspector.getPrimitiveJavaObject(field); - } - } -} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java index be6f92a..755e62c 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java @@ -29,6 +29,7 @@ import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.hive.util.HiveTableUtil; +import org.apache.flink.table.functions.hive.conversion.HiveInspectors; import org.apache.flink.types.Row; import org.apache.hadoop.conf.Configurable; @@ -218,8 +219,8 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<Row, HiveT int index = 0; for (; index < structFields.size(); index++) { StructField structField = structFields.get(index); - Object object = HiveRecordSerDe.obtainFlinkRowField( - structObjectInspector.getStructFieldData(hiveRowStruct, structField), structField.getFieldObjectInspector()); + Object object = HiveInspectors.toFlinkObject(structField.getFieldObjectInspector(), + structObjectInspector.getStructFieldData(hiveRowStruct, structField)); row.setField(index, object); } for (String partition : partitionColNames){ diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java index 85ca033..2ef5ee0 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; @@ -332,8 +331,8 @@ public class HiveInspectors { return result; } - if (inspector instanceof StandardStructObjectInspector) { - StandardStructObjectInspector structInspector = (StandardStructObjectInspector) inspector; + if (inspector instanceof StructObjectInspector) { + StructObjectInspector structInspector = (StructObjectInspector) inspector; List<? extends StructField> fields = structInspector.getAllStructFieldRefs(); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java index 5560872..c0f284c 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; @@ -31,6 +32,7 @@ import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory; import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; import org.apache.flink.table.catalog.hive.util.HiveTableUtil; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.apache.hadoop.hive.conf.HiveConf; @@ -124,4 +126,55 @@ public class HiveInputFormatTest { Assert.assertEquals("3,3,a,3000,3.33", rows.get(2).toString()); Assert.assertEquals("4,4,a,4000,4.44", rows.get(3).toString()); } + + @Test + public void testReadComplextDataTypeFromHiveInputFormat() throws Exception { + final String dbName = "default"; + final String tblName = "complext_test"; + + TableSchema.Builder builder = new TableSchema.Builder(); + builder.fields(new String[]{"a", "m", "s"}, new DataType[]{ + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), + DataTypes.ROW(DataTypes.FIELD("f1", DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING()))}); + + //Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set + //serDe temporarily. + HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null); + org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table(); + tbl.setDbName(dbName); + tbl.setTableName(tblName); + tbl.setCreateTime((int) (System.currentTimeMillis() / 1000)); + tbl.setParameters(new HashMap<>()); + StorageDescriptor sd = new StorageDescriptor(); + String location = HiveInputFormatTest.class.getResource("/complex_test").getPath(); + sd.setLocation(location); + sd.setInputFormat(DEFAULT_HIVE_INPUT_FORMAT_TEST_INPUT_FORMAT_CLASS); + sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setSerializationLib(DEFAULT_HIVE_INPUT_FORMAT_TEST_SERDE_CLASS); + sd.getSerdeInfo().setParameters(new HashMap<>()); + sd.getSerdeInfo().getParameters().put("serialization.format", "1"); + sd.getSerdeInfo().getParameters().put("field.delim", ";"); + //org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe use 'colelction.delim' as a delimiter config key + // it may be a typo of this class + sd.getSerdeInfo().getParameters().put("colelction.delim", ","); + sd.getSerdeInfo().getParameters().put("mapkey.delim", ":"); + sd.setCols(HiveTableUtil.createHiveColumns(builder.build())); + tbl.setSd(sd); + tbl.setPartitionKeys(new ArrayList<>()); + + client.createTable(tbl); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + RowTypeInfo rowTypeInfo = new RowTypeInfo(builder.build().getFieldTypes(), builder.build().getFieldNames()); + List<HiveTablePartition> partitions = new ArrayList<>(); + partitions.add(new HiveTablePartition(sd, new HashMap<>())); + HiveTableInputFormat hiveTableInputFormat = new HiveTableInputFormat(new JobConf(hiveConf), false, null, + partitions, rowTypeInfo); + DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat); + List<Row> rows = rowDataSet.collect(); + Assert.assertEquals(1, rows.size()); + Assert.assertEquals("[1, 2, 3],{1=a, 2=b},3,c", rows.get(0).toString()); + } } diff --git a/flink-connectors/flink-connector-hive/src/test/resources/complex_test/1.txt b/flink-connectors/flink-connector-hive/src/test/resources/complex_test/1.txt new file mode 100644 index 0000000..bcb199b --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/resources/complex_test/1.txt @@ -0,0 +1 @@ +1,2,3;1:a,2:b;3,c \ No newline at end of file