This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 38ee097 [FLINK-12970][hive] Support writing Hive complex types
38ee097 is described below
commit 38ee097a408a7181631103a69c5914aac2fd3ff8
Author: Rui Li <[email protected]>
AuthorDate: Tue Jun 25 20:45:26 2019 +0800
[FLINK-12970][hive] Support writing Hive complex types
This PR supports writing to Hive tables with complex data types.
This closes #8875.
---
.../connectors/hive/HiveTableOutputFormat.java | 23 +++--
.../table/catalog/hive/util/HiveTableUtil.java | 27 -----
.../table/catalog/hive/util/HiveTypeUtil.java | 14 +--
.../functions/hive/conversion/HiveInspectors.java | 36 ++++++-
.../batch/connectors/hive/HiveTableSinkTest.java | 114 ++++++++++++++++++---
5 files changed, 155 insertions(+), 59 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index d0ba4b0..a0fdf56 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -32,6 +32,9 @@ import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
@@ -133,6 +136,9 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
private transient String hiveVersion;
+ // to convert Flink object to Hive object
+ private transient HiveObjectConversion[] hiveConversions;
+
public HiveTableOutputFormat(JobConf jobConf, String databaseName,
String tableName, List<String> partitionColumns,
RowTypeInfo
rowTypeInfo, HiveTablePartition hiveTablePartition,
Properties
tableProperties, boolean overwrite) {
@@ -274,21 +280,24 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
dynamicPartitionOffset = rowTypeInfo.getArity() -
partitionColumns.size() + hiveTablePartition.getPartitionSpec().size();
}
- List<ObjectInspector> objectInspectors = new ArrayList<>();
- for (int i = 0; i < rowTypeInfo.getArity() -
partitionColumns.size(); i++) {
-
objectInspectors.add(HiveTableUtil.getObjectInspector(LegacyTypeInfoDataTypeConverter.toDataType(rowTypeInfo.getTypeAt(i))));
+ numNonPartitionColumns = isPartitioned ? rowTypeInfo.getArity()
- partitionColumns.size() : rowTypeInfo.getArity();
+ hiveConversions = new
HiveObjectConversion[numNonPartitionColumns];
+ List<ObjectInspector> objectInspectors = new
ArrayList<>(hiveConversions.length);
+ for (int i = 0; i < numNonPartitionColumns; i++) {
+ DataType dataType =
LegacyTypeInfoDataTypeConverter.toDataType(rowTypeInfo.getTypeAt(i));
+ ObjectInspector objectInspector =
HiveInspectors.getObjectInspector(dataType);
+ objectInspectors.add(objectInspector);
+ hiveConversions[i] =
HiveInspectors.getConversion(objectInspector, dataType.getLogicalType());
}
if (!isPartitioned) {
rowObjectInspector =
ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList(rowTypeInfo.getFieldNames()),
objectInspectors);
- numNonPartitionColumns = rowTypeInfo.getArity();
} else {
rowObjectInspector =
ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() -
partitionColumns.size()),
objectInspectors);
- numNonPartitionColumns = rowTypeInfo.getArity() -
partitionColumns.size();
}
hiveVersion =
jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION,
HiveShimLoader.getHiveVersion());
}
@@ -385,11 +394,11 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
outputCommitter.commitJob(jobContext);
}
- // converts a Row to a list so that Hive can serialize it
+ // converts a Row to a list of Hive objects so that Hive can serialize
it
private Object getConvertedRow(Row record) {
List<Object> res = new ArrayList<>(numNonPartitionColumns);
for (int i = 0; i < numNonPartitionColumns; i++) {
- res.add(record.getField(i));
+
res.add(hiveConversions[i].toHiveObject(record.getField(i)));
}
return res;
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index d4ed021..c2597a9 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.catalog.hive.util;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;
@@ -29,13 +28,8 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -139,25 +133,4 @@ public class HiveTableUtil {
return partition;
}
- /**
- * Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}.
- */
- public static ObjectInspector getObjectInspector(DataType flinkType)
throws IOException {
- return
getObjectInspector(HiveTypeUtil.toHiveTypeInfo(flinkType));
- }
-
- // TODO: reuse Hive's TypeInfoUtils?
- private static ObjectInspector getObjectInspector(TypeInfo type) throws
IOException {
- switch (type.getCategory()) {
-
- case PRIMITIVE:
- PrimitiveTypeInfo primitiveType =
(PrimitiveTypeInfo) type;
- return
PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType);
-
- // TODO: support complex types
- default:
- throw new IOException("Unsupported Hive type
category " + type.getCategory());
- }
- }
-
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
index aa8ee4b..b9f8a57 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.types.KeyValueDataType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.hadoop.hive.common.type.HiveChar;
@@ -165,14 +166,15 @@ public class HiveTypeUtil {
}
if (dataType instanceof FieldsDataType) {
- Map<String, DataType> fieldDataTypes =
((FieldsDataType) dataType).getFieldDataTypes();
+ FieldsDataType fieldsDataType = (FieldsDataType)
dataType;
+ // need to retrieve field names in order
+ List<String> names = ((RowType)
fieldsDataType.getLogicalType()).getFieldNames();
- List<String> names = new
ArrayList(fieldDataTypes.size());
- List<TypeInfo> typeInfos = new
ArrayList<>(fieldDataTypes.size());
+ Map<String, DataType> nameToType =
fieldsDataType.getFieldDataTypes();
+ List<TypeInfo> typeInfos = new
ArrayList<>(names.size());
- for (Map.Entry<String, DataType> e :
fieldDataTypes.entrySet()) {
- names.add(e.getKey());
- typeInfos.add(toHiveTypeInfo(e.getValue()));
+ for (String name : names) {
+
typeInfos.add(toHiveTypeInfo(nameToType.get(name)));
}
return TypeInfoFactory.getStructTypeInfo(names,
typeInfos);
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 56f0b41..bbd0b55 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -19,6 +19,8 @@
package org.apache.flink.table.functions.hive.conversion;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
import org.apache.flink.table.types.DataType;
@@ -44,6 +46,7 @@ import
org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -77,7 +80,10 @@ import
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspec
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -394,14 +400,40 @@ public class HiveInspectors {
return getObjectInspector(typeInfo);
}
+ /**
+ * Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}.
+ */
+ public static ObjectInspector getObjectInspector(DataType flinkType) {
+ return
getObjectInspector(HiveTypeUtil.toHiveTypeInfo(flinkType));
+ }
+
private static ObjectInspector getObjectInspector(TypeInfo type) {
switch (type.getCategory()) {
+
case PRIMITIVE:
PrimitiveTypeInfo primitiveType =
(PrimitiveTypeInfo) type;
return
PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType);
+ case LIST:
+ ListTypeInfo listType = (ListTypeInfo) type;
+ return
ObjectInspectorFactory.getStandardListObjectInspector(
+
getObjectInspector(listType.getListElementTypeInfo()));
+ case MAP:
+ MapTypeInfo mapType = (MapTypeInfo) type;
+ return
ObjectInspectorFactory.getStandardMapObjectInspector(
+
getObjectInspector(mapType.getMapKeyTypeInfo()),
getObjectInspector(mapType.getMapValueTypeInfo()));
+ case STRUCT:
+ StructTypeInfo structType = (StructTypeInfo)
type;
+ List<TypeInfo> fieldTypes =
structType.getAllStructFieldTypeInfos();
+
+ List<ObjectInspector> fieldInspectors = new
ArrayList<ObjectInspector>();
+ for (TypeInfo fieldType : fieldTypes) {
+
fieldInspectors.add(getObjectInspector(fieldType));
+ }
+
+ return
ObjectInspectorFactory.getStandardStructObjectInspector(
+
structType.getAllStructFieldNames(), fieldInspectors);
default:
- throw new FlinkHiveUDFException(
- String.format("TypeInfo %s is not
supported yet", type));
+ throw new CatalogException("Unsupported Hive
type category " + type.getCategory());
}
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index c880d54..b8f68a4 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -18,10 +18,9 @@
package org.apache.flink.batch.connectors.hive;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
@@ -31,6 +30,7 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveCatalogPartition;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.hadoop.fs.FileSystem;
@@ -50,6 +50,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
@@ -127,21 +128,94 @@ public class HiveTableSinkTest {
hiveCatalog.dropTable(tablePath, false);
}
- private RowTypeInfo createDestTable(String dbName, String tblName, int
numPartCols) throws Exception {
+ @Test
+ public void testWriteComplexType() throws Exception {
+ String dbName = "default";
+ String tblName = "dest";
ObjectPath tablePath = new ObjectPath(dbName, tblName);
- TableSchema tableSchema = new TableSchema(
- new String[]{"i", "l", "d", "s"},
- new TypeInformation[]{
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO}
- );
+
+ TableSchema.Builder builder = new TableSchema.Builder();
+ builder.fields(new String[]{"a", "m", "s"}, new DataType[]{
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING()),
+ DataTypes.ROW(DataTypes.FIELD("f1",
DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING()))});
+
+ RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName,
builder.build(), 0);
+ List<Row> toWrite = new ArrayList<>();
+ Row row = new Row(rowTypeInfo.getArity());
+ Object[] array = new Object[]{1, 2, 3};
+ Map<Integer, String> map = new HashMap<Integer, String>() {{
+ put(1, "a");
+ put(2, "b");
+ }};
+ Row struct = new Row(2);
+ struct.setField(0, 3);
+ struct.setField(1, "c");
+
+ row.setField(0, array);
+ row.setField(1, map);
+ row.setField(2, struct);
+ toWrite.add(row);
+
+ ExecutionEnvironment execEnv =
ExecutionEnvironment.createLocalEnvironment(1);
+ BatchTableEnvironment tableEnv =
BatchTableEnvironment.create(execEnv);
+ tableEnv.registerDataSet("complexSrc",
execEnv.fromCollection(toWrite, rowTypeInfo));
+
+ Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+ tableEnv.registerTableSink("complexSink", new HiveTableSink(new
JobConf(hiveConf), rowTypeInfo,
+ "default", "dest",
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys())));
+ tableEnv.sqlQuery("select * from
complexSrc").insertInto("complexSink");
+ execEnv.execute();
+
+ verifyWrittenData(new Path(hiveTable.getSd().getLocation(),
"0"), Collections.singletonList("1 2 3,1 a 2 b,3 c"));
+ hiveCatalog.dropTable(tablePath, false);
+
+ // nested complex types
+ builder = new TableSchema.Builder();
+ // array of rows
+ builder.fields(new String[]{"a"}, new
DataType[]{DataTypes.ARRAY(
+ DataTypes.ROW(DataTypes.FIELD("f1",
DataTypes.INT()), DataTypes.FIELD("f2", DataTypes.STRING())))});
+ rowTypeInfo = createDestTable(dbName, tblName, builder.build(),
0);
+ row = new Row(rowTypeInfo.getArity());
+ array = new Object[3];
+ row.setField(0, array);
+ for (int i = 0; i < array.length; i++) {
+ struct = new Row(2);
+ struct.setField(0, 1 + i);
+ struct.setField(1, String.valueOf((char) ('a' + i)));
+ array[i] = struct;
+ }
+ toWrite.clear();
+ toWrite.add(row);
+
+ tableEnv.registerDataSet("nestedSrc",
execEnv.fromCollection(toWrite, rowTypeInfo));
+ hiveTable = hiveCatalog.getHiveTable(tablePath);
+ tableEnv.registerTableSink("nestedSink", new HiveTableSink(new
JobConf(hiveConf), rowTypeInfo,
+ "default", "dest",
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys())));
+ tableEnv.sqlQuery("select * from
nestedSrc").insertInto("nestedSink");
+ execEnv.execute();
+
+ verifyWrittenData(new Path(hiveTable.getSd().getLocation(),
"0"), Collections.singletonList("1 a 2 b 3 c"));
+ hiveCatalog.dropTable(tablePath, false);
+ }
+
+ private RowTypeInfo createDestTable(String dbName, String tblName,
TableSchema tableSchema, int numPartCols) throws Exception {
CatalogTable catalogTable = createCatalogTable(tableSchema,
numPartCols);
- hiveCatalog.createTable(tablePath, catalogTable, false);
+ hiveCatalog.createTable(new ObjectPath(dbName, tblName),
catalogTable, false);
return new RowTypeInfo(tableSchema.getFieldTypes(),
tableSchema.getFieldNames());
}
+ private RowTypeInfo createDestTable(String dbName, String tblName, int
numPartCols) throws Exception {
+ TableSchema.Builder builder = new TableSchema.Builder();
+ builder.fields(new String[]{"i", "l", "d", "s"},
+ new DataType[]{
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.DOUBLE(),
+ DataTypes.STRING()});
+ return createDestTable(dbName, tblName, builder.build(),
numPartCols);
+ }
+
private CatalogTable createCatalogTable(TableSchema tableSchema, int
numPartCols) {
if (numPartCols == 0) {
return new CatalogTableImpl(tableSchema, new
HashMap<>(), "");
@@ -165,19 +239,25 @@ public class HiveTableSinkTest {
return res;
}
- private void verifyWrittenData(Path outputFile, List<Row> expected, int
numPartCols) throws Exception {
+ private void verifyWrittenData(Path outputFile, List<Row> expectedRows,
int numPartCols) throws Exception {
+ int[] fields = IntStream.range(0,
expectedRows.get(0).getArity() - numPartCols).toArray();
+ List<String> expected = new ArrayList<>(expectedRows.size());
+ for (Row row : expectedRows) {
+ expected.add(Row.project(row, fields).toString());
+ }
+ verifyWrittenData(outputFile, expected);
+ }
+
+ private void verifyWrittenData(Path outputFile, List<String> expected)
throws Exception {
FileSystem fs = outputFile.getFileSystem(hiveConf);
assertTrue(fs.exists(outputFile));
- int[] fields = IntStream.range(0, expected.get(0).getArity() -
numPartCols).toArray();
try (BufferedReader reader = new BufferedReader(new
InputStreamReader(fs.open(outputFile)))) {
int numWritten = 0;
String line = reader.readLine();
while (line != null) {
- Row expectedRow =
Row.project(expected.get(numWritten++), fields);
- assertEquals(expectedRow.toString(),
line.replaceAll("\u0001", ","));
+ assertEquals(expected.get(numWritten++),
line.replaceAll("\u0001", ",").replaceAll("\u0002", " ").replaceAll("\u0003", "
"));
line = reader.readLine();
}
- reader.close();
assertEquals(expected.size(), numWritten);
}
}