This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new d2b05d5  [FLINK-16767][hive] Failed to read Hive table with RegexSerDe
d2b05d5 is described below

commit d2b05d50fc9134919a4a74992cbc64b21b8feed0
Author: Rui Li <[email protected]>
AuthorDate: Tue Apr 14 15:53:04 2020 +0800

    [FLINK-16767][hive] Failed to read Hive table with RegexSerDe
    
    
    This closes #11579
---
 .../flink/connectors/hive/HiveTablePartition.java  | 15 ++++++++--
 .../flink/connectors/hive/HiveTableSource.java     |  9 ++++--
 .../hive/read/HiveMapredSplitReader.java           |  6 +---
 .../table/catalog/hive/util/HiveTableUtil.java     | 35 ----------------------
 .../connectors/hive/TableEnvHiveConnectorTest.java | 18 +++++++++++
 5 files changed, 38 insertions(+), 45 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
index f85ae01..4e1a904 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTablePartition.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import java.io.Serializable;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -40,13 +41,17 @@ public class HiveTablePartition implements Serializable {
        /** The map of partition key names and their values. */
        private final Map<String, Object> partitionSpec;
 
-       public HiveTablePartition(StorageDescriptor storageDescriptor) {
-               this(storageDescriptor, new LinkedHashMap<>());
+       // Table properties that should be used to initialize SerDe
+       private final Properties tableProps;
+
+       public HiveTablePartition(StorageDescriptor storageDescriptor, 
Properties tableProps) {
+               this(storageDescriptor, new LinkedHashMap<>(), tableProps);
        }
 
-       public HiveTablePartition(StorageDescriptor storageDescriptor, 
Map<String, Object> partitionSpec) {
+       public HiveTablePartition(StorageDescriptor storageDescriptor, 
Map<String, Object> partitionSpec, Properties tableProps) {
                this.storageDescriptor = checkNotNull(storageDescriptor, 
"storageDescriptor can not be null");
                this.partitionSpec = checkNotNull(partitionSpec, "partitionSpec 
can not be null");
+               this.tableProps = checkNotNull(tableProps, "tableProps can not 
be null");
        }
 
        public StorageDescriptor getStorageDescriptor() {
@@ -56,4 +61,8 @@ public class HiveTablePartition implements Serializable {
        public Map<String, Object> getPartitionSpec() {
                return partitionSpec;
        }
+
+       public Properties getTableProps() {
+               return tableProps;
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 98691b9..605bbbb 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.dataformat.BaseRow;
 import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
@@ -52,6 +53,7 @@ import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -67,6 +69,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.stream.Collectors;
 
 /**
@@ -232,6 +235,8 @@ public class HiveTableSource implements
                        String dbName = tablePath.getDatabaseName();
                        String tableName = tablePath.getObjectName();
                        List<String> partitionColNames = 
catalogTable.getPartitionKeys();
+                       Table hiveTable = client.getTable(dbName, tableName);
+                       Properties tableProps = 
HiveReflectionUtils.getTableMetadata(hiveShim, hiveTable);
                        if (partitionColNames != null && 
partitionColNames.size() > 0) {
                                final String defaultPartitionName = 
jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
                                                
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
@@ -260,11 +265,11 @@ public class HiveTableSource implements
                                                }
                                                
partitionColValues.put(partitionColName, partitionObject);
                                        }
-                                       HiveTablePartition hiveTablePartition = 
new HiveTablePartition(sd, partitionColValues);
+                                       HiveTablePartition hiveTablePartition = 
new HiveTablePartition(sd, partitionColValues, tableProps);
                                        
allHivePartitions.add(hiveTablePartition);
                                }
                        } else {
-                               allHivePartitions.add(new 
HiveTablePartition(client.getTable(dbName, tableName).getSd()));
+                               allHivePartitions.add(new 
HiveTablePartition(hiveTable.getSd(), tableProps));
                        }
                } catch (TException e) {
                        throw new FlinkHiveException("Failed to collect all 
partitions from hive metaStore", e);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java
index 877b52a..8164665 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveMapredSplitReader.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.connectors.hive.FlinkHiveException;
 import org.apache.flink.connectors.hive.HiveTablePartition;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
-import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.dataformat.BaseRow;
 import org.apache.flink.table.dataformat.DataFormatConverters;
 import org.apache.flink.table.dataformat.GenericRow;
@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Properties;
 
 import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
 
@@ -122,9 +120,7 @@ public class HiveMapredSplitReader implements SplitReader {
                try {
                        deserializer = (Deserializer) 
Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
                        Configuration conf = new Configuration();
-                       //properties are used to initialize hive Deserializer 
properly.
-                       Properties properties = 
HiveTableUtil.createPropertiesFromStorageDescriptor(sd);
-                       SerDeUtils.initializeSerDe(deserializer, conf, 
properties, null);
+                       SerDeUtils.initializeSerDe(deserializer, conf, 
hiveTablePartition.getTableProps(), null);
                        structObjectInspector = (StructObjectInspector) 
deserializer.getObjectInspector();
                        structFields = 
structObjectInspector.getAllStructFieldRefs();
                } catch (Exception e) {
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index d69dd4c..a696708 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -33,13 +33,9 @@ import 
org.apache.flink.table.functions.hive.conversion.HiveInspectors;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
 import java.util.ArrayList;
@@ -47,11 +43,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.Set;
 
-import static 
org.apache.flink.table.catalog.hive.HiveCatalogConfig.DEFAULT_LIST_COLUMN_TYPES_SEPARATOR;
-
 /**
  * Utils to for Hive-backed table.
  */
@@ -114,34 +107,6 @@ public class HiveTableUtil {
        // 
--------------------------------------------------------------------------------------------
 
        /**
-        * Create properties info to initialize a SerDe.
-        * @param storageDescriptor
-        * @return
-        */
-       public static Properties 
createPropertiesFromStorageDescriptor(StorageDescriptor storageDescriptor) {
-               SerDeInfo serDeInfo = storageDescriptor.getSerdeInfo();
-               Map<String, String> parameters = serDeInfo.getParameters();
-               Properties properties = new Properties();
-               properties.setProperty(
-                               serdeConstants.SERIALIZATION_FORMAT,
-                               
parameters.get(serdeConstants.SERIALIZATION_FORMAT));
-               List<String> colTypes = new ArrayList<>();
-               List<String> colNames = new ArrayList<>();
-               List<FieldSchema> cols = storageDescriptor.getCols();
-               for (FieldSchema col: cols){
-                       colTypes.add(col.getType());
-                       colNames.add(col.getName());
-               }
-               properties.setProperty(serdeConstants.LIST_COLUMNS, 
StringUtils.join(colNames, String.valueOf(SerDeUtils.COMMA)));
-               // Note: serdeConstants.COLUMN_NAME_DELIMITER is not defined in 
previous Hive. We use a literal to save on shim
-               properties.setProperty("column.name.delimite", 
String.valueOf(SerDeUtils.COMMA));
-               properties.setProperty(serdeConstants.LIST_COLUMN_TYPES, 
StringUtils.join(colTypes, DEFAULT_LIST_COLUMN_TYPES_SEPARATOR));
-               
properties.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL");
-               properties.putAll(parameters);
-               return properties;
-       }
-
-       /**
         * Creates a Hive partition instance.
         */
        public static Partition createHivePartition(String dbName, String 
tableName, List<String> values,
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index dc5360e..21b7c60 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -534,6 +534,24 @@ public class TableEnvHiveConnectorTest {
                }
        }
 
+       @Test
+       public void testRegexSerDe() throws Exception {
+               hiveShell.execute("create database db1");
+               try {
+                       hiveShell.execute("create table db1.src (x int,y 
string) " +
+                                       "row format serde 
'org.apache.hadoop.hive.serde2.RegexSerDe' " +
+                                       "with serdeproperties 
('input.regex'='([\\\\d]+)\\u0001([\\\\S]+)')");
+                       HiveTestUtils.createTextTableInserter(hiveShell, "db1", 
"src")
+                                       .addRow(new Object[]{1, "a"})
+                                       .addRow(new Object[]{2, "ab"})
+                                       .commit();
+                       TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
+                       assertEquals("[1,a, 2,ab]", 
TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.src order by 
x")).toString());
+               } finally {
+                       hiveShell.execute("drop database db1 cascade");
+               }
+       }
+
        private TableEnvironment getTableEnvWithHiveCatalog() {
                TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
                tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);

Reply via email to