This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b867750  [FLINK-13023][hive] Generate HiveTableSource from a Hive table
b867750 is described below

commit b867750b4ba8b58767aa600d732fbfa5fd7c64fe
Author: Xuefu Zhang <[email protected]>
AuthorDate: Fri Jun 28 09:50:42 2019 -0700

    [FLINK-13023][hive] Generate HiveTableSource from a Hive table
---
 .../batch/connectors/hive/HiveTableFactory.java    |  7 ++--
 .../connectors/hive/HiveTableInputFormat.java      | 31 ++++++++---------
 .../batch/connectors/hive/HiveTableSource.java     | 40 ++++++++++------------
 .../batch/connectors/hive/HiveInputFormatTest.java |  6 ++--
 .../connectors/hive/HiveTableFactoryTest.java      |  2 ++
 .../batch/connectors/hive/HiveTableSourceTest.java |  6 +++-
 6 files changed, 46 insertions(+), 46 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
index fd142da..a22014a 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
@@ -88,8 +88,7 @@ public class HiveTableFactory implements 
TableSourceFactory<Row>, TableSinkFacto
         * Creates and configures a {@link 
org.apache.flink.table.sources.InputFormatTableSource} using the given {@link 
CatalogTable}.
         */
        private InputFormatTableSource<Row> 
createInputFormatTableSource(ObjectPath tablePath, CatalogTable table) {
-               // TODO: create an InputFormatTableSource from a 
HiveCatalogTable instance.
-               return null;
+               return new HiveTableSource(new JobConf(hiveConf), tablePath, 
table);
        }
 
        @Override
@@ -100,7 +99,7 @@ public class HiveTableFactory implements 
TableSourceFactory<Row>, TableSinkFacto
                boolean isGeneric = 
Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC));
 
                if (!isGeneric) {
-                       return createOutputFormatTableSink(tablePath, 
(CatalogTableImpl) table);
+                       return createOutputFormatTableSink(tablePath, table);
                } else {
                        return TableFactoryUtil.findAndCreateTableSink(table);
                }
@@ -109,7 +108,7 @@ public class HiveTableFactory implements 
TableSourceFactory<Row>, TableSinkFacto
        /**
         * Creates and configures a {@link 
org.apache.flink.table.sinks.OutputFormatTableSink} using the given {@link 
CatalogTable}.
         */
-       private OutputFormatTableSink<Row> 
createOutputFormatTableSink(ObjectPath tablePath, CatalogTableImpl table) {
+       private OutputFormatTableSink<Row> 
createOutputFormatTableSink(ObjectPath tablePath, CatalogTable table) {
                return new HiveTableSink(new JobConf(hiveConf), tablePath, 
table);
        }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
index fb99ee4..be6f92a 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableInputFormat.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.types.Row;
 
@@ -75,11 +77,10 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<Row, HiveT
        protected transient boolean fetched = false;
        protected transient boolean hasNext;
 
-       private boolean isPartitioned;
        private RowTypeInfo rowTypeInfo;
 
        //Necessary info to init deserializer
-       private String[] partitionColNames;
+       private List<String> partitionColNames;
        //For non-partition hive table, partitions only contains one partition 
which partitionValues is empty.
        private List<HiveTablePartition> partitions;
        private transient Deserializer deserializer;
@@ -92,16 +93,16 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<Row, HiveT
 
        public HiveTableInputFormat(
                        JobConf jobConf,
-                       boolean isPartitioned,
-                       String[] partitionColNames,
-                       List<HiveTablePartition> partitions,
-                       RowTypeInfo rowTypeInfo) {
+                       CatalogTable catalogTable,
+                       List<HiveTablePartition> partitions) {
                super(jobConf.getCredentials());
-               this.rowTypeInfo = checkNotNull(rowTypeInfo, "rowTypeInfo can 
not be null.");
-               this.jobConf = new JobConf(jobConf);
-               this.isPartitioned = isPartitioned;
-               this.partitionColNames = partitionColNames;
+               checkNotNull(catalogTable, "catalogTable can not be null.");
                this.partitions = checkNotNull(partitions, "partitions can not 
be null.");
+
+               this.jobConf = new JobConf(jobConf);
+               this.partitionColNames = catalogTable.getPartitionKeys();
+               TableSchema tableSchema = catalogTable.getSchema();
+               this.rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames());
        }
 
        @Override
@@ -221,10 +222,8 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<Row, HiveT
                                                
structObjectInspector.getStructFieldData(hiveRowStruct, structField), 
structField.getFieldObjectInspector());
                                row.setField(index, object);
                        }
-                       if (isPartitioned) {
-                               for (String partition : partitionColNames){
-                                       row.setField(index++, 
hiveTablePartition.getPartitionSpec().get(partition));
-                               }
+                       for (String partition : partitionColNames){
+                               row.setField(index++, 
hiveTablePartition.getPartitionSpec().get(partition));
                        }
                } catch (Exception e){
                        logger.error("Error happens when converting hive data 
type to flink data type.");
@@ -246,7 +245,6 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<Row, HiveT
        private void writeObject(ObjectOutputStream out) throws IOException {
                super.write(out);
                jobConf.write(out);
-               out.writeObject(isPartitioned);
                out.writeObject(rowTypeInfo);
                out.writeObject(partitionColNames);
                out.writeObject(partitions);
@@ -264,9 +262,8 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<Row, HiveT
                if (currentUserCreds != null) {
                        jobConf.getCredentials().addAll(currentUserCreds);
                }
-               isPartitioned = (boolean) in.readObject();
                rowTypeInfo = (RowTypeInfo) in.readObject();
-               partitionColNames = (String[]) in.readObject();
+               partitionColNames = (List<String>) in.readObject();
                partitions = (List<HiveTablePartition>) in.readObject();
        }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
index 279fcb5..19e46de 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSource.java
@@ -19,9 +19,10 @@
 package org.apache.flink.batch.connectors.hive;
 
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
@@ -52,42 +53,33 @@ public class HiveTableSource extends 
InputFormatTableSource<Row> {
 
        private static Logger logger = 
LoggerFactory.getLogger(HiveTableSource.class);
 
-       private final TableSchema tableSchema;
        private final JobConf jobConf;
-       private final String dbName;
-       private final String tableName;
-       private final Boolean isPartitionTable;
-       private final String[] partitionColNames;
+       private final ObjectPath tablePath;
+       private final CatalogTable catalogTable;
        private List<HiveTablePartition> allPartitions;
        private String hiveVersion;
 
-       public HiveTableSource(TableSchema tableSchema,
-                                               JobConf jobConf,
-                                               String dbName,
-                                               String tableName,
-                                               String[] partitionColNames) {
-               this.tableSchema = tableSchema;
+       public HiveTableSource(JobConf jobConf, ObjectPath tablePath, 
CatalogTable catalogTable) {
                this.jobConf = jobConf;
-               this.dbName = dbName;
-               this.tableName = tableName;
-               this.isPartitionTable = (null != partitionColNames && 
partitionColNames.length != 0);
-               this.partitionColNames = partitionColNames;
+               this.tablePath = tablePath;
+               this.catalogTable = catalogTable;
                this.hiveVersion = 
jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, 
HiveShimLoader.getHiveVersion());
        }
 
        @Override
        public InputFormat getInputFormat() {
                initAllPartitions();
-               return new HiveTableInputFormat(jobConf, isPartitionTable, 
partitionColNames, allPartitions, new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames()));
+               return new HiveTableInputFormat(jobConf, catalogTable, 
allPartitions);
        }
 
        @Override
        public TableSchema getTableSchema() {
-               return tableSchema;
+               return catalogTable.getSchema();
        }
 
        @Override
        public DataType getProducedDataType() {
+               TableSchema tableSchema = catalogTable.getSchema();
                DataTypes.Field[] fields = new 
DataTypes.Field[tableSchema.getFieldCount()];
                for (int i = 0; i < fields.length; i++) {
                        fields[i] = 
DataTypes.FIELD(tableSchema.getFieldName(i).get(), 
tableSchema.getFieldDataType(i).get());
@@ -101,17 +93,21 @@ public class HiveTableSource extends 
InputFormatTableSource<Row> {
                // Ideally, we need to go thru Catalog API to get all info we 
need here, which requires some major
                // refactoring. We will postpone this until we merge Blink to 
Flink.
                try (HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(new HiveConf(jobConf, HiveConf.class), 
hiveVersion)) {
-                       if (isPartitionTable) {
+                       String dbName = tablePath.getDatabaseName();
+                       String tableName = tablePath.getObjectName();
+                       List<String> partitionColNames = 
catalogTable.getPartitionKeys();
+                       if (partitionColNames != null && 
partitionColNames.size() > 0) {
                                List<Partition> partitions =
                                                client.listPartitions(dbName, 
tableName, (short) -1);
                                for (Partition partition : partitions) {
                                        StorageDescriptor sd = 
partition.getSd();
                                        Map<String, Object> partitionColValues 
= new HashMap<>();
-                                       for (int i = 0; i < 
partitionColNames.length; i++) {
+                                       for (int i = 0; i < 
partitionColNames.size(); i++) {
+                                               String partitionColName = 
partitionColNames.get(i);
                                                String partitionValue = 
partition.getValues().get(i);
-                                               DataType type = 
tableSchema.getFieldDataType(partitionColNames[i]).get();
+                                               DataType type = 
catalogTable.getSchema().getFieldDataType(partitionColName).get();
                                                Object partitionObject = 
restorePartitionValueFromFromType(partitionValue, type);
-                                               
partitionColValues.put(partitionColNames[i], partitionObject);
+                                               
partitionColValues.put(partitionColName, partitionObject);
                                        }
                                        allPartitions.add(new 
HiveTablePartition(sd, partitionColValues));
                                }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
index 3f9445c..5560872 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveInputFormatTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
@@ -112,8 +114,8 @@ public class HiveInputFormatTest {
                RowTypeInfo rowTypeInfo = new 
RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
                List<HiveTablePartition> partitions = new ArrayList<>();
                partitions.add(new HiveTablePartition(sd, new HashMap<>()));
-               HiveTableInputFormat hiveTableInputFormat = new 
HiveTableInputFormat(new JobConf(hiveConf), false, null,
-                                                                               
                                                                        
partitions, rowTypeInfo);
+               CatalogTable catalogTable = (CatalogTable) 
hiveCatalog.getTable(new ObjectPath(dbName, tblName));
+               HiveTableInputFormat hiveTableInputFormat = new 
HiveTableInputFormat(new JobConf(hiveConf), catalogTable, partitions);
                DataSet<Row> rowDataSet = env.createInput(hiveTableInputFormat);
                List<Row> rows = rowDataSet.collect();
                Assert.assertEquals(4, rows.size());
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
index 4622322..237146d 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
@@ -112,6 +112,8 @@ public class HiveTableFactoryTest {
                HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
                TableSink tableSink = tableFactory.createTableSink(path, table);
                assertTrue(tableSink instanceof HiveTableSink);
+               TableSource tableSource = tableFactory.createTableSource(path, 
table);
+               assertTrue(tableSource instanceof HiveTableSource);
        }
 
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
index 1a8807f..d6cf124 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
@@ -110,7 +112,9 @@ public class HiveTableSourceTest {
                client.createTable(tbl);
                ExecutionEnvironment execEnv = 
ExecutionEnvironment.createLocalEnvironment(1);
                BatchTableEnvironment tEnv = 
BatchTableEnvironment.create(execEnv);
-               HiveTableSource hiveTableSource = new 
HiveTableSource(tableSchema, new JobConf(hiveConf), dbName, tblName, null);
+               ObjectPath tablePath = new ObjectPath(dbName, tblName);
+               CatalogTable catalogTable = (CatalogTable) 
hiveCatalog.getTable(tablePath);
+               HiveTableSource hiveTableSource = new HiveTableSource(new 
JobConf(hiveConf), tablePath, catalogTable);
                Table src = tEnv.fromTableSource(hiveTableSource);
                DataSet<Row> rowDataSet = tEnv.toDataSet(src, new 
RowTypeInfo(tableSchema.getFieldTypes(),
                                                                                
                                                        
tableSchema.getFieldNames()));

Reply via email to