This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 24078de [FLINK-13192][hive] Add tests for different Hive table formats
24078de is described below
commit 24078de3dc9f9c4f016da3d1acd909d8311431dd
Author: Rui Li <[email protected]>
AuthorDate: Mon Jul 29 21:02:52 2019 +0800
[FLINK-13192][hive] Add tests for different Hive table formats
This closes #9264
---
flink-connectors/flink-connector-hive/pom.xml | 4 ---
.../connectors/hive/HiveTableOutputFormat.java | 19 ++++++----
.../flink/table/catalog/hive/HiveCatalog.java | 23 +++++++++---
.../connectors/hive/TableEnvHiveConnectorTest.java | 42 ++++++++++++++++++++++
4 files changed, 73 insertions(+), 15 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/pom.xml
b/flink-connectors/flink-connector-hive/pom.xml
index 88ef8a1..9752be9 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -416,10 +416,6 @@ under the License.
<artifactId>tez-common</artifactId>
</exclusion>
<exclusion>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-mapreduce</artifactId>
- </exclusion>
- <exclusion>
<!-- This dependency is no longer
shipped with the JDK since Java 9.-->
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
index e4caac1..9e1ee46 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
@@ -58,9 +58,10 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -122,7 +123,8 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
// number of non-partitioning columns
private transient int numNonPartitionColumns;
- private transient AbstractSerDe serializer;
+ // SerDe in Hive-1.2.1 and Hive-2.3.4 can be of different classes, make
sure to use a common base class
+ private transient Serializer recordSerDe;
//StructObjectInspector represents the hive row structure.
private transient StructObjectInspector rowObjectInspector;
private transient Class<? extends Writable> outputClass;
@@ -257,11 +259,14 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
public void open(int taskNumber, int numTasks) throws IOException {
try {
StorageDescriptor sd =
hiveTablePartition.getStorageDescriptor();
- serializer = (AbstractSerDe)
Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
- ReflectionUtils.setConf(serializer, jobConf);
+ Object serdeLib =
Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
+ Preconditions.checkArgument(serdeLib instanceof
Serializer && serdeLib instanceof Deserializer,
+ "Expect a SerDe lib implementing both
Serializer and Deserializer, but actually got " +
serdeLib.getClass().getName());
+ recordSerDe = (Serializer) serdeLib;
+ ReflectionUtils.setConf(recordSerDe, jobConf);
// TODO: support partition properties, for now assume
they're same as table properties
- SerDeUtils.initializeSerDe(serializer, jobConf,
tableProperties, null);
- outputClass = serializer.getSerializedClass();
+ SerDeUtils.initializeSerDe((Deserializer) recordSerDe,
jobConf, tableProperties, null);
+ outputClass = recordSerDe.getSerializedClass();
} catch (IllegalAccessException | SerDeException |
InstantiationException | ClassNotFoundException e) {
throw new FlinkRuntimeException("Error initializing
Hive serializer", e);
}
@@ -331,7 +336,7 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
partitionToWriter.put(partName,
partitionWriter);
}
}
-
partitionWriter.recordWriter.write(serializer.serialize(getConvertedRow(record),
rowObjectInspector));
+
partitionWriter.recordWriter.write(recordSerDe.serialize(getConvertedRow(record),
rowObjectInspector));
} catch (IOException | SerDeException e) {
throw new IOException("Could not write Record.", e);
} catch (MetaException e) {
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 261180f..dd50ce2 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.thrift.TException;
@@ -323,7 +324,7 @@ public class HiveCatalog extends AbstractCatalog {
checkNotNull(tablePath, "tablePath cannot be null");
Table hiveTable = getHiveTable(tablePath);
- return instantiateCatalogTable(hiveTable);
+ return instantiateCatalogTable(hiveTable, hiveConf);
}
@Override
@@ -394,7 +395,7 @@ public class HiveCatalog extends AbstractCatalog {
return;
}
- CatalogBaseTable existingTable =
instantiateCatalogTable(hiveTable);
+ CatalogBaseTable existingTable =
instantiateCatalogTable(hiveTable, hiveConf);
if (existingTable.getClass() != newCatalogTable.getClass()) {
throw new CatalogException(
@@ -493,7 +494,7 @@ public class HiveCatalog extends AbstractCatalog {
}
}
- private static CatalogBaseTable instantiateCatalogTable(Table
hiveTable) {
+ private static CatalogBaseTable instantiateCatalogTable(Table
hiveTable, HiveConf hiveConf) {
boolean isView = TableType.valueOf(hiveTable.getTableType()) ==
TableType.VIRTUAL_VIEW;
// Table properties
@@ -506,8 +507,22 @@ public class HiveCatalog extends AbstractCatalog {
String comment = properties.remove(HiveCatalogConfig.COMMENT);
// Table schema
+ List<FieldSchema> fields;
+ if
(org.apache.hadoop.hive.ql.metadata.Table.hasMetastoreBasedSchema(hiveConf,
+
hiveTable.getSd().getSerdeInfo().getSerializationLib())) {
+ // get schema from metastore
+ fields = hiveTable.getSd().getCols();
+ } else {
+ // get schema from deserializer
+ try {
+ fields =
MetaStoreUtils.getFieldsFromDeserializer(hiveTable.getTableName(),
+
MetaStoreUtils.getDeserializer(hiveConf, hiveTable, true));
+ } catch (SerDeException | MetaException e) {
+ throw new CatalogException("Failed to get Hive
table schema from deserializer", e);
+ }
+ }
TableSchema tableSchema =
-
HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(),
hiveTable.getPartitionKeys());
+ HiveTableUtil.createTableSchema(fields,
hiveTable.getPartitionKeys());
// Partition keys
List<String> partitionKeys = new ArrayList<>();
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 4ac76cd..e8c402a 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -36,6 +36,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -100,10 +102,50 @@ public class TableEnvHiveConnectorTest {
hiveShell.execute("drop database db1 cascade");
}
+ @Test
+ public void testDifferentFormats() throws Exception {
+ String[] formats = new String[]{"orc", "parquet",
"sequencefile", "csv"};
+ for (String format : formats) {
+ readWriteFormat(format);
+ }
+ }
+
+ private void readWriteFormat(String format) throws Exception {
+ TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+
+ hiveShell.execute("create database db1");
+
+ // create source and dest tables
+ String suffix;
+ if (format.equals("csv")) {
+ suffix = "row format serde
'org.apache.hadoop.hive.serde2.OpenCSVSerde'";
+ } else {
+ suffix = "stored as " + format;
+ }
+ hiveShell.execute("create table db1.src (i int,s string) " +
suffix);
+ hiveShell.execute("create table db1.dest (i int,s string) " +
suffix);
+
+ // prepare source data with Hive
+ hiveShell.execute("insert into db1.src values (1,'a'),(2,'b')");
+
+ // populate dest table with source table
+ tableEnv.sqlUpdate("insert into db1.dest select * from
db1.src");
+ tableEnv.execute("test_" + format);
+
+ // verify data on hive side
+ verifyHiveQueryResult("select * from db1.dest",
Arrays.asList("1\ta", "2\tb"));
+
+ hiveShell.execute("drop database db1 cascade");
+ }
+
private TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());
return tableEnv;
}
+
+ private void verifyHiveQueryResult(String query, List<String> expected)
{
+ assertEquals(new HashSet<>(expected), new
HashSet<>(hiveShell.executeQuery(query)));
+ }
}