This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new d2b05d5 [FLINK-16767][hive] Failed to read Hive table with RegexSerDe
d2b05d5 is described below
commit d2b05d50fc9134919a4a74992cbc64b21b8feed0
Author: Rui Li <[email protected]>
AuthorDate: Tue Apr 14 15:53:04 2020 +0800
[FLINK-16767][hive] Failed to read Hive table with RegexSerDe
This closes #11579
---
.../flink/connectors/hive/HiveTablePartition.java | 15 ++++++++--
.../flink/connectors/hive/HiveTableSource.java | 9 ++++--
.../hive/read/HiveMapredSplitReader.java | 6 +---
.../table/catalog/hive/util/HiveTableUtil.java | 35 ----------------------
.../connectors/hive/TableEnvHiveConnectorTest.java | 18 +++++++++++
5 files changed, 38 insertions(+), 45 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
index f85ae01..4e1a904 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -40,13 +41,17 @@ public class HiveTablePartition implements Serializable {
/** The map of partition key names and their values. */
private final Map<String, Object> partitionSpec;
- public HiveTablePartition(StorageDescriptor storageDescriptor) {
- this(storageDescriptor, new LinkedHashMap<>());
+ // Table properties that should be used to initialize SerDe
+ private final Properties tableProps;
+
+ public HiveTablePartition(StorageDescriptor storageDescriptor,
Properties tableProps) {
+ this(storageDescriptor, new LinkedHashMap<>(), tableProps);
}
- public HiveTablePartition(StorageDescriptor storageDescriptor,
Map<String, Object> partitionSpec) {
+ public HiveTablePartition(StorageDescriptor storageDescriptor,
Map<String, Object> partitionSpec, Properties tableProps) {
this.storageDescriptor = checkNotNull(storageDescriptor,
"storageDescriptor can not be null");
this.partitionSpec = checkNotNull(partitionSpec, "partitionSpec
can not be null");
+ this.tableProps = checkNotNull(tableProps, "tableProps can not
be null");
}
public StorageDescriptor getStorageDescriptor() {
@@ -56,4 +61,8 @@ public class HiveTablePartition implements Serializable {
public Map<String, Object> getPartitionSpec() {
return partitionSpec;
}
+
+ public Properties getTableProps() {
+ return tableProps;
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 98691b9..605bbbb 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -36,6 +36,7 @@ import
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
@@ -52,6 +53,7 @@ import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -67,6 +69,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.stream.Collectors;
/**
@@ -232,6 +235,8 @@ public class HiveTableSource implements
String dbName = tablePath.getDatabaseName();
String tableName = tablePath.getObjectName();
List<String> partitionColNames =
catalogTable.getPartitionKeys();
+ Table hiveTable = client.getTable(dbName, tableName);
+ Properties tableProps =
HiveReflectionUtils.getTableMetadata(hiveShim, hiveTable);
if (partitionColNames != null &&
partitionColNames.size() > 0) {
final String defaultPartitionName =
jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
@@ -260,11 +265,11 @@ public class HiveTableSource implements
}
partitionColValues.put(partitionColName, partitionObject);
}
- HiveTablePartition hiveTablePartition =
new HiveTablePartition(sd, partitionColValues);
+ HiveTablePartition hiveTablePartition =
new HiveTablePartition(sd, partitionColValues, tableProps);
allHivePartitions.add(hiveTablePartition);
}
} else {
- allHivePartitions.add(new
HiveTablePartition(client.getTable(dbName, tableName).getSd()));
+ allHivePartitions.add(new
HiveTablePartition(hiveTable.getSd(), tableProps));
}
} catch (TException e) {
throw new FlinkHiveException("Failed to collect all
partitions from hive metaStore", e);
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java
index 877b52a..8164665 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java
@@ -22,7 +22,6 @@ import
org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.table.catalog.hive.client.HiveShim;
-import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.DataFormatConverters;
import org.apache.flink.table.dataformat.GenericRow;
@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
-import java.util.Properties;
import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
@@ -122,9 +120,7 @@ public class HiveMapredSplitReader implements SplitReader {
try {
deserializer = (Deserializer)
Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
Configuration conf = new Configuration();
- //properties are used to initialize hive Deserializer
properly.
- Properties properties =
HiveTableUtil.createPropertiesFromStorageDescriptor(sd);
- SerDeUtils.initializeSerDe(deserializer, conf,
properties, null);
+ SerDeUtils.initializeSerDe(deserializer, conf,
hiveTablePartition.getTableProps(), null);
structObjectInspector = (StructObjectInspector)
deserializer.getObjectInspector();
structFields =
structObjectInspector.getAllStructFieldRefs();
} catch (Exception e) {
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index d69dd4c..a696708 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -33,13 +33,9 @@ import
org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import java.util.ArrayList;
@@ -47,11 +43,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Properties;
import java.util.Set;
-import static
org.apache.flink.table.catalog.hive.HiveCatalogConfig.DEFAULT_LIST_COLUMN_TYPES_SEPARATOR;
-
/**
* Utils to for Hive-backed table.
*/
@@ -114,34 +107,6 @@ public class HiveTableUtil {
//
--------------------------------------------------------------------------------------------
/**
- * Create properties info to initialize a SerDe.
- * @param storageDescriptor
- * @return
- */
- public static Properties
createPropertiesFromStorageDescriptor(StorageDescriptor storageDescriptor) {
- SerDeInfo serDeInfo = storageDescriptor.getSerdeInfo();
- Map<String, String> parameters = serDeInfo.getParameters();
- Properties properties = new Properties();
- properties.setProperty(
- serdeConstants.SERIALIZATION_FORMAT,
-
parameters.get(serdeConstants.SERIALIZATION_FORMAT));
- List<String> colTypes = new ArrayList<>();
- List<String> colNames = new ArrayList<>();
- List<FieldSchema> cols = storageDescriptor.getCols();
- for (FieldSchema col: cols){
- colTypes.add(col.getType());
- colNames.add(col.getName());
- }
- properties.setProperty(serdeConstants.LIST_COLUMNS,
StringUtils.join(colNames, String.valueOf(SerDeUtils.COMMA)));
- // Note: serdeConstants.COLUMN_NAME_DELIMITER is not defined in
previous Hive. We use a literal to save on shim
- properties.setProperty("column.name.delimite",
String.valueOf(SerDeUtils.COMMA));
- properties.setProperty(serdeConstants.LIST_COLUMN_TYPES,
StringUtils.join(colTypes, DEFAULT_LIST_COLUMN_TYPES_SEPARATOR));
-
properties.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL");
- properties.putAll(parameters);
- return properties;
- }
-
- /**
* Creates a Hive partition instance.
*/
public static Partition createHivePartition(String dbName, String
tableName, List<String> values,
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index dc5360e..21b7c60 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -534,6 +534,24 @@ public class TableEnvHiveConnectorTest {
}
}
+ @Test
+ public void testRegexSerDe() throws Exception {
+ hiveShell.execute("create database db1");
+ try {
+ hiveShell.execute("create table db1.src (x int,y
string) " +
+ "row format serde
'org.apache.hadoop.hive.serde2.RegexSerDe' " +
+ "with serdeproperties
('input.regex'='([\\\\d]+)\\u0001([\\\\S]+)')");
+ HiveTestUtils.createTextTableInserter(hiveShell, "db1",
"src")
+ .addRow(new Object[]{1, "a"})
+ .addRow(new Object[]{2, "ab"})
+ .commit();
+ TableEnvironment tableEnv =
getTableEnvWithHiveCatalog();
+ assertEquals("[1,a, 2,ab]",
TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.src order by
x")).toString());
+ } finally {
+ hiveShell.execute("drop database db1 cascade");
+ }
+ }
+
private TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv =
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);