This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d1f2d72 [FLINK-12727][hive] Make HiveTableOutputFormat support
writing partitioned tables
d1f2d72 is described below
commit d1f2d722588c1e979510b5de5c8c22dc51d126f1
Author: Rui Li <[email protected]>
AuthorDate: Tue Jun 4 20:38:47 2019 +0800
[FLINK-12727][hive] Make HiveTableOutputFormat support writing partitioned
tables
This PR adds support for writing (both static and dynamic) partitioned
table in HiveTableOutputFormat.
This closes #8614.
---
.../connectors/hive/HiveTableOutputFormat.java | 125 ++++++++++++++++-----
.../flink/batch/connectors/hive/HiveTableUtil.java | 60 ----------
.../flink/table/catalog/hive/HiveCatalog.java | 16 +--
.../table/catalog/hive/util/HiveTableUtil.java | 45 ++++++++
.../batch/connectors/hive/HiveInputFormatTest.java | 7 +-
.../connectors/hive/HiveTableOutputFormatTest.java | 99 +++++++++++++---
6 files changed, 231 insertions(+), 121 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index 61179fa..5e15d72 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -30,6 +30,7 @@ import
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
@@ -40,7 +41,11 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
@@ -81,6 +86,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -98,17 +104,17 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
private static final long serialVersionUID = 5167529504848109023L;
private transient JobConf jobConf;
- private transient String dbName;
+ private transient String databaseName;
private transient String tableName;
- private transient List<String> partitionCols;
+ private transient List<String> partitionColumns;
private transient RowTypeInfo rowTypeInfo;
private transient HiveTablePartition hiveTablePartition;
- private transient Properties tblProperties;
+ private transient Properties tableProperties;
private transient boolean overwrite;
private transient boolean isPartitioned;
private transient boolean isDynamicPartition;
// number of non-partitioning columns
- private transient int numNonPartitionCols;
+ private transient int numNonPartitionColumns;
private transient AbstractSerDe serializer;
//StructObjectInspector represents the hive row structure.
@@ -117,32 +123,35 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
private transient TaskAttemptContext context;
// Maps a partition dir name to the corresponding writer. Used for
dynamic partitioning.
- private transient Map<String, HivePartitionWriter> partitionToWriter;
+ private transient Map<String, HivePartitionWriter> partitionToWriter =
new HashMap<>();
// Writer for non-partitioned and static partitioned table
private transient HivePartitionWriter staticWriter;
- public HiveTableOutputFormat(JobConf jobConf, String dbName, String
tableName, List<String> partitionCols,
+ // the offset of dynamic partition columns within a row
+ private transient int dynamicPartitionOffset;
+
+ public HiveTableOutputFormat(JobConf jobConf, String databaseName,
String tableName, List<String> partitionColumns,
RowTypeInfo
rowTypeInfo, HiveTablePartition hiveTablePartition,
- Properties
tblProperties, boolean overwrite) {
+ Properties
tableProperties, boolean overwrite) {
super(jobConf.getCredentials());
-
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(dbName), "DB
name is empty");
+
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName),
"DB name is empty");
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(tableName),
"Table name is empty");
Preconditions.checkNotNull(rowTypeInfo, "RowTypeInfo cannot be
null");
Preconditions.checkNotNull(hiveTablePartition,
"HiveTablePartition cannot be null");
- Preconditions.checkNotNull(tblProperties, "Table properties
cannot be null");
+ Preconditions.checkNotNull(tableProperties, "Table properties
cannot be null");
HadoopUtils.mergeHadoopConf(jobConf);
this.jobConf = jobConf;
- this.dbName = dbName;
+ this.databaseName = databaseName;
this.tableName = tableName;
- this.partitionCols = partitionCols;
+ this.partitionColumns = partitionColumns;
this.rowTypeInfo = rowTypeInfo;
this.hiveTablePartition = hiveTablePartition;
- this.tblProperties = tblProperties;
+ this.tableProperties = tableProperties;
this.overwrite = overwrite;
- isPartitioned = partitionCols != null &&
!partitionCols.isEmpty();
- isDynamicPartition = isPartitioned && partitionCols.size() >
hiveTablePartition.getPartitionSpec().size();
+ isPartitioned = partitionColumns != null &&
!partitionColumns.isEmpty();
+ isDynamicPartition = isPartitioned && partitionColumns.size() >
hiveTablePartition.getPartitionSpec().size();
}
// Custom serialization methods
@@ -155,10 +164,10 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
out.writeObject(overwrite);
out.writeObject(rowTypeInfo);
out.writeObject(hiveTablePartition);
- out.writeObject(partitionCols);
- out.writeObject(dbName);
+ out.writeObject(partitionColumns);
+ out.writeObject(databaseName);
out.writeObject(tableName);
- out.writeObject(tblProperties);
+ out.writeObject(tableProperties);
}
@SuppressWarnings("unchecked")
@@ -178,11 +187,11 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
overwrite = (boolean) in.readObject();
rowTypeInfo = (RowTypeInfo) in.readObject();
hiveTablePartition = (HiveTablePartition) in.readObject();
- partitionCols = (List<String>) in.readObject();
- dbName = (String) in.readObject();
+ partitionColumns = (List<String>) in.readObject();
+ databaseName = (String) in.readObject();
tableName = (String) in.readObject();
partitionToWriter = new HashMap<>();
- tblProperties = (Properties) in.readObject();
+ tableProperties = (Properties) in.readObject();
}
@Override
@@ -191,12 +200,27 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
Path stagingDir = new Path(jobSD.getLocation());
FileSystem fs = stagingDir.getFileSystem(jobConf);
try (HiveMetastoreClientWrapper client =
HiveMetastoreClientFactory.create(new HiveConf(jobConf, HiveConf.class))) {
- Table table = client.getTable(dbName, tableName);
+ Table table = client.getTable(databaseName, tableName);
if (!isDynamicPartition) {
commitJob(stagingDir.toString());
}
if (isPartitioned) {
- // TODO: to be implemented
+ if (isDynamicPartition) {
+ FileStatus[] generatedParts =
HiveStatsUtils.getFileStatusRecurse(stagingDir,
+ partitionColumns.size() -
hiveTablePartition.getPartitionSpec().size(), fs);
+ for (FileStatus part : generatedParts) {
+
commitJob(part.getPath().toString());
+ LinkedHashMap<String, String>
fullPartSpec = new LinkedHashMap<>();
+
Warehouse.makeSpecFromName(fullPartSpec, part.getPath());
+ loadPartition(part.getPath(),
table, fullPartSpec, client);
+ }
+ } else {
+ LinkedHashMap<String, String> partSpec
= new LinkedHashMap<>();
+ for (String partCol :
hiveTablePartition.getPartitionSpec().keySet()) {
+ partSpec.put(partCol,
hiveTablePartition.getPartitionSpec().get(partCol).toString());
+ }
+ loadPartition(stagingDir, table,
partSpec, client);
+ }
} else {
moveFiles(stagingDir, new
Path(table.getSd().getLocation()));
}
@@ -223,7 +247,7 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
serializer = (AbstractSerDe)
Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
ReflectionUtils.setConf(serializer, jobConf);
// TODO: support partition properties, for now assume
they're same as table properties
- SerDeUtils.initializeSerDe(serializer, jobConf,
tblProperties, null);
+ SerDeUtils.initializeSerDe(serializer, jobConf,
tableProperties, null);
outputClass = serializer.getSerializedClass();
} catch (IllegalAccessException | SerDeException |
InstantiationException | ClassNotFoundException e) {
throw new FlinkRuntimeException("Error initializing
Hive serializer", e);
@@ -243,10 +267,12 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
if (!isDynamicPartition) {
staticWriter =
writerForLocation(hiveTablePartition.getStorageDescriptor().getLocation());
+ } else {
+ dynamicPartitionOffset = rowTypeInfo.getArity() -
partitionColumns.size() + hiveTablePartition.getPartitionSpec().size();
}
List<ObjectInspector> objectInspectors = new ArrayList<>();
- for (int i = 0; i < rowTypeInfo.getArity() -
partitionCols.size(); i++) {
+ for (int i = 0; i < rowTypeInfo.getArity() -
partitionColumns.size(); i++) {
objectInspectors.add(HiveTableUtil.getObjectInspector(LegacyTypeInfoDataTypeConverter.toDataType(rowTypeInfo.getTypeAt(i))));
}
@@ -254,12 +280,12 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
rowObjectInspector =
ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList(rowTypeInfo.getFieldNames()),
objectInspectors);
- numNonPartitionCols = rowTypeInfo.getArity();
+ numNonPartitionColumns = rowTypeInfo.getArity();
} else {
rowObjectInspector =
ObjectInspectorFactory.getStandardStructObjectInspector(
-
Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() -
partitionCols.size()),
+
Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() -
partitionColumns.size()),
objectInspectors);
- numNonPartitionCols = rowTypeInfo.getArity() -
partitionCols.size();
+ numNonPartitionColumns = rowTypeInfo.getArity() -
partitionColumns.size();
}
}
@@ -268,11 +294,48 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
try {
HivePartitionWriter partitionWriter = staticWriter;
if (isDynamicPartition) {
- // TODO: to be implemented
+ LinkedHashMap<String, String> dynPartSpec = new
LinkedHashMap<>();
+ // only need to check the dynamic partitions
+ final int numStaticPart =
hiveTablePartition.getPartitionSpec().size();
+ for (int i = dynamicPartitionOffset; i <
record.getArity(); i++) {
+ // TODO: seems Hive also just calls
toString(), need further investigation to confirm
+ // TODO: validate partition value
+ String partVal =
record.getField(i).toString();
+ dynPartSpec.put(partitionColumns.get(i
- dynamicPartitionOffset + numStaticPart), partVal);
+ }
+ String partName =
Warehouse.makePartPath(dynPartSpec);
+ partitionWriter =
partitionToWriter.get(partName);
+ if (partitionWriter == null) {
+ String stagingDir =
hiveTablePartition.getStorageDescriptor().getLocation();
+ partitionWriter =
writerForLocation(stagingDir + Path.SEPARATOR + partName);
+ partitionToWriter.put(partName,
partitionWriter);
+ }
}
partitionWriter.recordWriter.write(serializer.serialize(getConvertedRow(record),
rowObjectInspector));
} catch (IOException | SerDeException e) {
throw new IOException("Could not write Record.", e);
+ } catch (MetaException e) {
+ throw new CatalogException(e);
+ }
+ }
+
+ // load a single partition
+ private void loadPartition(Path srcDir, Table table, Map<String,
String> partSpec, HiveMetastoreClientWrapper client)
+ throws TException, IOException {
+ Path tblLocation = new Path(table.getSd().getLocation());
+ List<Partition> existingPart =
client.listPartitions(databaseName, tableName,
+ new ArrayList<>(partSpec.values()), (short) 1);
+ Path destDir = existingPart.isEmpty() ? new Path(tblLocation,
Warehouse.makePartPath(partSpec)) :
+ new
Path(existingPart.get(0).getSd().getLocation());
+ moveFiles(srcDir, destDir);
+ // register new partition if it doesn't exist
+ if (existingPart.isEmpty()) {
+ StorageDescriptor sd = new
StorageDescriptor(hiveTablePartition.getStorageDescriptor());
+ sd.setLocation(destDir.toString());
+ Partition partition =
HiveTableUtil.createHivePartition(databaseName, tableName,
+ new ArrayList<>(partSpec.values()), sd,
new HashMap<>());
+ partition.setValues(new ArrayList<>(partSpec.values()));
+ client.add_partition(partition);
}
}
@@ -320,8 +383,8 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
// converts a Row to a list so that Hive can serialize it
private Object getConvertedRow(Row record) {
- List<Object> res = new ArrayList<>(numNonPartitionCols);
- for (int i = 0; i < numNonPartitionCols; i++) {
+ List<Object> res = new ArrayList<>(numNonPartitionColumns);
+ for (int i = 0; i < numNonPartitionColumns; i++) {
res.add(record.getField(i));
}
return res;
@@ -388,7 +451,7 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
FileSinkOperator.RecordWriter recordWriter;
try {
recordWriter =
HiveFileFormatUtils.getRecordWriter(clonedConf, outputFormat,
- outputClass, isCompressed, tblProperties,
taskPath, Reporter.NULL);
+ outputClass, isCompressed, tableProperties,
taskPath, Reporter.NULL);
} catch (HiveException e) {
throw new IOException(e);
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableUtil.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableUtil.java
deleted file mode 100644
index 70d3852..0000000
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableUtil.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.hive;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
-import org.apache.flink.table.types.DataType;
-
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-
-import java.io.IOException;
-
-/**
- * Util class for accessing Hive tables.
- */
-public class HiveTableUtil {
-
- private HiveTableUtil() {
- }
-
- /**
- * Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}.
- */
- public static ObjectInspector getObjectInspector(DataType flinkType)
throws IOException {
- return
getObjectInspector(HiveTypeUtil.toHiveTypeInfo(flinkType));
- }
-
- // TODO: reuse Hive's TypeInfoUtils?
- private static ObjectInspector getObjectInspector(TypeInfo type) throws
IOException {
- switch (type.getCategory()) {
-
- case PRIMITIVE:
- PrimitiveTypeInfo primitiveType =
(PrimitiveTypeInfo) type;
- return
PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType);
-
- // TODO: support complex types
- default:
- throw new IOException("Unsupported Hive type
category " + type.getCategory());
- }
- }
-}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 2068f68..78cb3bd 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -784,7 +784,6 @@ public class HiveCatalog extends AbstractCatalog {
private Partition instantiateHivePartition(Table hiveTable,
CatalogPartitionSpec partitionSpec, CatalogPartition catalogPartition)
throws PartitionSpecInvalidException {
- Partition partition = new Partition();
List<String> partCols =
getFieldNames(hiveTable.getPartitionKeys());
List<String> partValues = getOrderedFullPartitionValues(
partitionSpec, partCols, new
ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()));
@@ -797,15 +796,10 @@ public class HiveCatalog extends AbstractCatalog {
}
// TODO: handle GenericCatalogPartition
HiveCatalogPartition hiveCatalogPartition =
(HiveCatalogPartition) catalogPartition;
- partition.setValues(partValues);
- partition.setDbName(hiveTable.getDbName());
- partition.setTableName(hiveTable.getTableName());
- partition.setCreateTime((int) (System.currentTimeMillis() /
1000));
- partition.setParameters(hiveCatalogPartition.getProperties());
- partition.setSd(hiveTable.getSd().deepCopy());
-
partition.getSd().setLocation(hiveCatalogPartition.getLocation());
-
- return partition;
+ StorageDescriptor sd = hiveTable.getSd().deepCopy();
+ sd.setLocation(hiveCatalogPartition.getLocation());
+ return HiveTableUtil.createHivePartition(hiveTable.getDbName(),
hiveTable.getTableName(), partValues,
+ sd, hiveCatalogPartition.getProperties());
}
private static CatalogPartition instantiateCatalogPartition(Partition
hivePartition) {
@@ -822,7 +816,7 @@ public class HiveCatalog extends AbstractCatalog {
/**
* Get field names from field schemas.
*/
- private static List<String> getFieldNames(List<FieldSchema>
fieldSchemas) {
+ public static List<String> getFieldNames(List<FieldSchema>
fieldSchemas) {
List<String> names = new ArrayList<>(fieldSchemas.size());
for (FieldSchema fs : fieldSchemas) {
names.add(fs.getName());
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index 712e709..d4ed021 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -18,17 +18,24 @@
package org.apache.flink.table.catalog.hive.util;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -115,4 +122,42 @@ public class HiveTableUtil {
return properties;
}
+ /**
+ * Creates a Hive partition instance.
+ */
+ public static Partition createHivePartition(String dbName, String
tableName, List<String> values,
+ StorageDescriptor sd, Map<String, String> parameters) {
+ Partition partition = new Partition();
+ partition.setDbName(dbName);
+ partition.setTableName(tableName);
+ partition.setValues(values);
+ partition.setParameters(parameters);
+ partition.setSd(sd);
+ int currentTime = (int) (System.currentTimeMillis() / 1000);
+ partition.setCreateTime(currentTime);
+ partition.setLastAccessTime(currentTime);
+ return partition;
+ }
+
+ /**
+ * Get Hive {@link ObjectInspector} for a Flink {@link TypeInformation}.
+ */
+ public static ObjectInspector getObjectInspector(DataType flinkType)
throws IOException {
+ return
getObjectInspector(HiveTypeUtil.toHiveTypeInfo(flinkType));
+ }
+
+ // TODO: reuse Hive's TypeInfoUtils?
+ private static ObjectInspector getObjectInspector(TypeInfo type) throws
IOException {
+ switch (type.getCategory()) {
+
+ case PRIMITIVE:
+ PrimitiveTypeInfo primitiveType =
(PrimitiveTypeInfo) type;
+ return
PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(primitiveType);
+
+ // TODO: support complex types
+ default:
+ throw new IOException("Unsupported Hive type
category " + type.getCategory());
+ }
+ }
+
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
index 1f9aaa7..d348d4c 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
@@ -26,13 +26,12 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
+import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.types.Row;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.mapred.JobConf;
@@ -87,7 +86,7 @@ public class HiveInputFormatTest {
);
//Now we used metaStore client to create hive table instead of
using hiveCatalog for it doesn't support set
//serDe temporarily.
- IMetaStoreClient client =
RetryingMetaStoreClient.getProxy(hiveConf, null, null,
HiveMetaStoreClient.class.getName(), true);
+ HiveMetastoreClientWrapper client =
HiveMetastoreClientFactory.create(hiveConf);
org.apache.hadoop.hive.metastore.api.Table tbl = new
org.apache.hadoop.hive.metastore.api.Table();
tbl.setDbName(dbName);
tbl.setTableName(tblName);
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
index e6d79bd..5bf32b6 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
@@ -22,8 +22,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveCatalogPartition;
import org.apache.flink.table.catalog.hive.HiveCatalogTable;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.types.Row;
@@ -43,9 +45,13 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -76,18 +82,18 @@ public class HiveTableOutputFormatTest {
public void testInsertIntoNonPartitionTable() throws Exception {
String dbName = "default";
String tblName = "dest";
- RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName);
+ RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0);
ObjectPath tablePath = new ObjectPath(dbName, tblName);
Table hiveTable = hiveCatalog.getHiveTable(tablePath);
- HiveTableOutputFormat outputFormat =
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, false);
+ HiveTableOutputFormat outputFormat =
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, null, false);
outputFormat.open(0, 1);
List<Row> toWrite = generateRecords(5);
writeRecords(toWrite, outputFormat);
outputFormat.close();
outputFormat.finalizeGlobal(1);
- verifyWrittenData(new Path(hiveTable.getSd().getLocation(),
"0"), toWrite);
+ verifyWrittenData(new Path(hiveTable.getSd().getLocation(),
"0"), toWrite, 0);
hiveCatalog.dropTable(tablePath, false);
}
@@ -95,32 +101,83 @@ public class HiveTableOutputFormatTest {
public void testInsertOverwrite() throws Exception {
String dbName = "default";
String tblName = "dest";
- RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName);
+ RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0);
ObjectPath tablePath = new ObjectPath(dbName, tblName);
Table hiveTable = hiveCatalog.getHiveTable(tablePath);
// write some data and verify
- HiveTableOutputFormat outputFormat =
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, false);
+ HiveTableOutputFormat outputFormat =
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, null, false);
outputFormat.open(0, 1);
List<Row> toWrite = generateRecords(5);
writeRecords(toWrite, outputFormat);
outputFormat.close();
outputFormat.finalizeGlobal(1);
- verifyWrittenData(new Path(hiveTable.getSd().getLocation(),
"0"), toWrite);
+ verifyWrittenData(new Path(hiveTable.getSd().getLocation(),
"0"), toWrite, 0);
// write some data to overwrite existing data and verify
- outputFormat = createHiveTableOutputFormat(tablePath,
hiveTable, rowTypeInfo, true);
+ outputFormat = createHiveTableOutputFormat(tablePath,
hiveTable, rowTypeInfo, null, true);
outputFormat.open(0, 1);
toWrite = generateRecords(3);
writeRecords(toWrite, outputFormat);
outputFormat.close();
outputFormat.finalizeGlobal(1);
- verifyWrittenData(new Path(hiveTable.getSd().getLocation(),
"0"), toWrite);
+ verifyWrittenData(new Path(hiveTable.getSd().getLocation(),
"0"), toWrite, 0);
hiveCatalog.dropTable(tablePath, false);
}
- private RowTypeInfo createDestTable(String dbName, String tblName)
throws Exception {
+ @Test
+ public void testInsertIntoStaticPartition() throws Exception {
+ String dbName = "default";
+ String tblName = "dest";
+ RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
+ ObjectPath tablePath = new ObjectPath(dbName, tblName);
+ Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+
+ Map<String, Object> partSpec = new HashMap<>();
+ partSpec.put("s", "a");
+ HiveTableOutputFormat outputFormat =
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, partSpec, false);
+ outputFormat.open(0, 1);
+ List<Row> toWrite = generateRecords(1);
+ writeRecords(toWrite, outputFormat);
+ outputFormat.close();
+ outputFormat.finalizeGlobal(1);
+
+ // make sure new partition is created
+ assertEquals(toWrite.size(),
hiveCatalog.listPartitions(tablePath).size());
+ HiveCatalogPartition catalogPartition = (HiveCatalogPartition)
hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(
+
partSpec.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().toString()))));
+ verifyWrittenData(new Path(catalogPartition.getLocation(),
"0"), toWrite, 1);
+
+ hiveCatalog.dropTable(tablePath, false);
+ }
+
+ @Test
+ public void testInsertIntoDynamicPartition() throws Exception {
+ String dbName = "default";
+ String tblName = "dest";
+ RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
+ ObjectPath tablePath = new ObjectPath(dbName, tblName);
+ Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+
+ HiveTableOutputFormat outputFormat =
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo,
Collections.emptyMap(), false);
+ outputFormat.open(0, 1);
+ List<Row> toWrite = generateRecords(5);
+ writeRecords(toWrite, outputFormat);
+ outputFormat.close();
+ outputFormat.finalizeGlobal(1);
+
+ List<CatalogPartitionSpec> partitionSpecs =
hiveCatalog.listPartitions(tablePath);
+ assertEquals(toWrite.size(), partitionSpecs.size());
+ for (int i = 0; i < toWrite.size(); i++) {
+ HiveCatalogPartition partition = (HiveCatalogPartition)
hiveCatalog.getPartition(tablePath, partitionSpecs.get(i));
+ verifyWrittenData(new Path(partition.getLocation(),
"0"), Collections.singletonList(toWrite.get(i)), 1);
+ }
+
+ hiveCatalog.dropTable(tablePath, false);
+ }
+
+ private RowTypeInfo createDestTable(String dbName, String tblName, int
numPartCols) throws Exception {
ObjectPath tablePath = new ObjectPath(dbName, tblName);
TableSchema tableSchema = new TableSchema(
new String[]{"i", "l", "d", "s"},
@@ -130,29 +187,41 @@ public class HiveTableOutputFormatTest {
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO}
);
- HiveCatalogTable catalogTable = new
HiveCatalogTable(tableSchema, new HashMap<>(), "");
+ HiveCatalogTable catalogTable = createCatalogTable(tableSchema,
numPartCols);
hiveCatalog.createTable(tablePath, catalogTable, false);
return new RowTypeInfo(tableSchema.getFieldTypes(),
tableSchema.getFieldNames());
}
+ private HiveCatalogTable createCatalogTable(TableSchema tableSchema,
int numPartCols) {
+ if (numPartCols == 0) {
+ return new HiveCatalogTable(tableSchema, new
HashMap<>(), "");
+ }
+ String[] partCols = new String[numPartCols];
+ System.arraycopy(tableSchema.getFieldNames(),
tableSchema.getFieldNames().length - numPartCols, partCols, 0, numPartCols);
+ return new HiveCatalogTable(tableSchema,
Arrays.asList(partCols), new HashMap<>(), "");
+ }
+
private HiveTableOutputFormat createHiveTableOutputFormat(ObjectPath
tablePath, Table hiveTable,
- RowTypeInfo rowTypeInfo, boolean overwrite) throws
Exception {
+ RowTypeInfo rowTypeInfo, Map<String, Object> partSpec,
boolean overwrite) throws Exception {
StorageDescriptor jobSD = hiveTable.getSd().deepCopy();
jobSD.setLocation(hiveTable.getSd().getLocation() +
"/.staging");
- HiveTablePartition hiveTablePartition = new
HiveTablePartition(jobSD, null);
+ HiveTablePartition hiveTablePartition = new
HiveTablePartition(jobSD, partSpec);
JobConf jobConf = new JobConf(hiveConf);
return new HiveTableOutputFormat(jobConf,
tablePath.getDatabaseName(), tablePath.getObjectName(),
- Collections.emptyList(), rowTypeInfo,
hiveTablePartition, MetaStoreUtils.getTableMetadata(hiveTable), overwrite);
+
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys()), rowTypeInfo,
hiveTablePartition,
+ MetaStoreUtils.getTableMetadata(hiveTable),
overwrite);
}
- private void verifyWrittenData(Path outputFile, List<Row> expected)
throws Exception {
+ private void verifyWrittenData(Path outputFile, List<Row> expected, int
numPartCols) throws Exception {
FileSystem fs = outputFile.getFileSystem(hiveConf);
assertTrue(fs.exists(outputFile));
+ int[] fields = IntStream.range(0, expected.get(0).getArity() -
numPartCols).toArray();
try (BufferedReader reader = new BufferedReader(new
InputStreamReader(fs.open(outputFile)))) {
int numWritten = 0;
String line = reader.readLine();
while (line != null) {
-
assertEquals(expected.get(numWritten++).toString(), line.replaceAll("\u0001",
","));
+ Row expectedRow =
Row.project(expected.get(numWritten++), fields);
+ assertEquals(expectedRow.toString(),
line.replaceAll("\u0001", ","));
line = reader.readLine();
}
reader.close();