This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f172129  [FLINK-12989][hive]: Generate HiveTableSink from from a Hive 
table
f172129 is described below

commit f1721293b0701d584d42bd68671181e332d2ad04
Author: Xuefu Zhang <[email protected]>
AuthorDate: Wed Jun 26 11:56:11 2019 -0700

    [FLINK-12989][hive]: Generate HiveTableSink from from a Hive table
    
    This PR adds the generation of HiveTableSink from from a Hive table.
    
    This closes #8890.
---
 .../batch/connectors/hive/HiveTableFactory.java    | 26 +++++++++++-----
 .../connectors/hive/HiveTableOutputFormat.java     | 36 ++++++++++------------
 .../flink/batch/connectors/hive/HiveTableSink.java | 33 ++++++++++++--------
 .../flink/table/catalog/hive/HiveCatalog.java      |  2 +-
 .../connectors/hive/HiveTableFactoryTest.java      | 27 +++++++++++++---
 .../connectors/hive/HiveTableOutputFormatTest.java | 26 ++++++++--------
 .../batch/connectors/hive/HiveTableSinkTest.java   | 16 +++++-----
 .../table/catalog/GenericInMemoryCatalogTest.java  |  8 -----
 .../org/apache/flink/table/catalog/ObjectPath.java |  4 ++-
 .../flink/table/factories/TableSinkFactory.java    |  4 ++-
 .../flink/table/factories/TableSourceFactory.java  |  4 ++-
 .../flink/table/catalog/DatabaseCalciteSchema.java |  6 ++--
 12 files changed, 113 insertions(+), 79 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
index f3c25ec..fd142da 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.batch.connectors.hive;
 
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
 import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.factories.TableSinkFactory;
@@ -31,13 +32,23 @@ import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobConf;
+
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A table factory implementation for tables stored in Hive catalog.
  */
 public class HiveTableFactory implements TableSourceFactory<Row>, 
TableSinkFactory<Row> {
+       private HiveConf hiveConf;
+
+       public HiveTableFactory(HiveConf hiveConf) {
+               this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
+       }
 
        @Override
        public Map<String, String> requiredContext() {
@@ -60,14 +71,14 @@ public class HiveTableFactory implements 
TableSourceFactory<Row>, TableSinkFacto
        }
 
        @Override
-       public TableSource<Row> createTableSource(CatalogTable table) {
+       public TableSource<Row> createTableSource(ObjectPath tablePath, 
CatalogTable table) {
                Preconditions.checkNotNull(table);
                Preconditions.checkArgument(table instanceof CatalogTableImpl);
 
                boolean isGeneric = 
Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC));
 
                if (!isGeneric) {
-                       return createInputFormatTableSource(table);
+                       return createInputFormatTableSource(tablePath, table);
                } else {
                        return TableFactoryUtil.findAndCreateTableSource(table);
                }
@@ -76,20 +87,20 @@ public class HiveTableFactory implements 
TableSourceFactory<Row>, TableSinkFacto
        /**
         * Creates and configures a {@link 
org.apache.flink.table.sources.InputFormatTableSource} using the given {@link 
CatalogTable}.
         */
-       private InputFormatTableSource<Row> 
createInputFormatTableSource(CatalogTable table) {
+       private InputFormatTableSource<Row> 
createInputFormatTableSource(ObjectPath tablePath, CatalogTable table) {
                // TODO: create an InputFormatTableSource from a 
HiveCatalogTable instance.
                return null;
        }
 
        @Override
-       public TableSink<Row> createTableSink(CatalogTable table) {
+       public TableSink<Row> createTableSink(ObjectPath tablePath, 
CatalogTable table) {
                Preconditions.checkNotNull(table);
                Preconditions.checkArgument(table instanceof CatalogTableImpl);
 
                boolean isGeneric = 
Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC));
 
                if (!isGeneric) {
-                       return createOutputFormatTableSink(table);
+                       return createOutputFormatTableSink(tablePath, 
(CatalogTableImpl) table);
                } else {
                        return TableFactoryUtil.findAndCreateTableSink(table);
                }
@@ -98,9 +109,8 @@ public class HiveTableFactory implements 
TableSourceFactory<Row>, TableSinkFacto
        /**
         * Creates and configures a {@link 
org.apache.flink.table.sinks.OutputFormatTableSink} using the given {@link 
CatalogTable}.
         */
-       private OutputFormatTableSink<Row> 
createOutputFormatTableSink(CatalogTable table) {
-               // TODO: create an outputFormatTableSink from a 
HiveCatalogTable instance.
-               return null;
+       private OutputFormatTableSink<Row> 
createOutputFormatTableSink(ObjectPath tablePath, CatalogTableImpl table) {
+               return new HiveTableSink(new JobConf(hiveConf), tablePath, 
table);
        }
 
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index a0fdf56..898ab53 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -25,6 +25,9 @@ import 
org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
 import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
@@ -108,8 +111,7 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
        private static final long serialVersionUID = 5167529504848109023L;
 
        private transient JobConf jobConf;
-       private transient String databaseName;
-       private transient String tableName;
+       private transient ObjectPath tablePath;
        private transient List<String> partitionColumns;
        private transient RowTypeInfo rowTypeInfo;
        private transient HiveTablePartition hiveTablePartition;
@@ -139,23 +141,20 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
        // to convert Flink object to Hive object
        private transient HiveObjectConversion[] hiveConversions;
 
-       public HiveTableOutputFormat(JobConf jobConf, String databaseName, 
String tableName, List<String> partitionColumns,
-                                                               RowTypeInfo 
rowTypeInfo, HiveTablePartition hiveTablePartition,
+       public HiveTableOutputFormat(JobConf jobConf, ObjectPath tablePath, 
CatalogTable table, HiveTablePartition hiveTablePartition,
                                                                Properties 
tableProperties, boolean overwrite) {
                super(jobConf.getCredentials());
 
-               
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), 
"DB name is empty");
-               
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(tableName), 
"Table name is empty");
-               Preconditions.checkNotNull(rowTypeInfo, "RowTypeInfo cannot be 
null");
+               Preconditions.checkNotNull(table, "table cannot be null");
                Preconditions.checkNotNull(hiveTablePartition, 
"HiveTablePartition cannot be null");
                Preconditions.checkNotNull(tableProperties, "Table properties 
cannot be null");
 
                HadoopUtils.mergeHadoopConf(jobConf);
                this.jobConf = jobConf;
-               this.databaseName = databaseName;
-               this.tableName = tableName;
-               this.partitionColumns = partitionColumns;
-               this.rowTypeInfo = rowTypeInfo;
+               this.tablePath = tablePath;
+               this.partitionColumns = table.getPartitionKeys();
+               TableSchema tableSchema = table.getSchema();
+               this.rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames());
                this.hiveTablePartition = hiveTablePartition;
                this.tableProperties = tableProperties;
                this.overwrite = overwrite;
@@ -174,8 +173,7 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                out.writeObject(rowTypeInfo);
                out.writeObject(hiveTablePartition);
                out.writeObject(partitionColumns);
-               out.writeObject(databaseName);
-               out.writeObject(tableName);
+               out.writeObject(tablePath);
                out.writeObject(tableProperties);
        }
 
@@ -197,8 +195,7 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                rowTypeInfo = (RowTypeInfo) in.readObject();
                hiveTablePartition = (HiveTablePartition) in.readObject();
                partitionColumns = (List<String>) in.readObject();
-               databaseName = (String) in.readObject();
-               tableName = (String) in.readObject();
+               tablePath = (ObjectPath) in.readObject();
                partitionToWriter = new HashMap<>();
                tableProperties = (Properties) in.readObject();
        }
@@ -209,7 +206,7 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                Path stagingDir = new Path(jobSD.getLocation());
                FileSystem fs = stagingDir.getFileSystem(jobConf);
                try (HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(new HiveConf(jobConf, HiveConf.class), 
hiveVersion)) {
-                       Table table = client.getTable(databaseName, tableName);
+                       Table table = 
client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
                        if (!isDynamicPartition) {
                                commitJob(stagingDir.toString());
                        }
@@ -336,8 +333,9 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
        private void loadPartition(Path srcDir, Table table, Map<String, 
String> partSpec, HiveMetastoreClientWrapper client)
                        throws TException, IOException {
                Path tblLocation = new Path(table.getSd().getLocation());
-               List<Partition> existingPart = 
client.listPartitions(databaseName, tableName,
-                               new ArrayList<>(partSpec.values()), (short) 1);
+               String dbName = tablePath.getDatabaseName();
+               String tableName = tablePath.getObjectName();
+               List<Partition> existingPart = client.listPartitions(dbName, 
tableName, new ArrayList<>(partSpec.values()), (short) 1);
                Path destDir = existingPart.isEmpty() ? new Path(tblLocation, 
Warehouse.makePartPath(partSpec)) :
                                new 
Path(existingPart.get(0).getSd().getLocation());
                moveFiles(srcDir, destDir);
@@ -345,7 +343,7 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                if (existingPart.isEmpty()) {
                        StorageDescriptor sd = new 
StorageDescriptor(hiveTablePartition.getStorageDescriptor());
                        sd.setLocation(destDir.toString());
-                       Partition partition = 
HiveTableUtil.createHivePartition(databaseName, tableName,
+                       Partition partition = 
HiveTableUtil.createHivePartition(dbName, tableName,
                                        new ArrayList<>(partSpec.values()), sd, 
new HashMap<>());
                        partition.setValues(new ArrayList<>(partSpec.values()));
                        client.add_partition(partition);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
index 7b318cc..ab077d5 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
@@ -21,6 +21,9 @@ package org.apache.flink.batch.connectors.hive;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
@@ -55,30 +58,31 @@ import java.util.List;
 public class HiveTableSink extends OutputFormatTableSink<Row> {
 
        private final JobConf jobConf;
+       private final CatalogTable catalogTable;
+       private final ObjectPath tablePath;
        private final RowTypeInfo rowTypeInfo;
-       private final String dbName;
-       private final String tableName;
-       private final List<String> partitionColumns;
        private final String hiveVersion;
 
        // TODO: need OverwritableTableSink to configure this
        private boolean overwrite = false;
 
-       public HiveTableSink(JobConf jobConf, RowTypeInfo rowTypeInfo, String 
dbName, String tableName,
-                       List<String> partitionColumns) {
+       public HiveTableSink(JobConf jobConf, ObjectPath tablePath, 
CatalogTable table) {
                this.jobConf = jobConf;
-               this.rowTypeInfo = rowTypeInfo;
-               this.dbName = dbName;
-               this.tableName = tableName;
-               this.partitionColumns = partitionColumns;
+               this.tablePath = tablePath;
+               this.catalogTable = table;
                hiveVersion = 
jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, 
HiveShimLoader.getHiveVersion());
+               TableSchema tableSchema = table.getSchema();
+               rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames());
        }
 
        @Override
        public OutputFormat<Row> getOutputFormat() {
+               List<String> partitionColumns = catalogTable.getPartitionKeys();
                boolean isPartitioned = partitionColumns != null && 
!partitionColumns.isEmpty();
                // TODO: need PartitionableTableSink to decide whether it's 
dynamic partitioning
                boolean isDynamicPartition = isPartitioned;
+               String dbName = tablePath.getDatabaseName();
+               String tableName = tablePath.getObjectName();
                try (HiveMetastoreClientWrapper client = 
HiveMetastoreClientFactory.create(new HiveConf(jobConf, HiveConf.class), 
hiveVersion)) {
                        Table table = client.getTable(dbName, tableName);
                        StorageDescriptor sd = table.getSd();
@@ -109,8 +113,13 @@ public class HiveTableSink extends 
OutputFormatTableSink<Row> {
                                sd.setLocation(toStagingDir(sdLocation, 
jobConf));
                                hiveTablePartition = new HiveTablePartition(sd, 
null);
                        }
-                       return new HiveTableOutputFormat(jobConf, dbName, 
tableName,
-                                       partitionColumns, rowTypeInfo, 
hiveTablePartition, MetaStoreUtils.getTableMetadata(table), overwrite);
+                       return new HiveTableOutputFormat(
+                               jobConf,
+                               tablePath,
+                               catalogTable,
+                               hiveTablePartition,
+                               MetaStoreUtils.getTableMetadata(table),
+                               overwrite);
                } catch (TException e) {
                        throw new CatalogException("Failed to query Hive 
metaStore", e);
                } catch (IOException e) {
@@ -120,7 +129,7 @@ public class HiveTableSink extends 
OutputFormatTableSink<Row> {
 
        @Override
        public TableSink<Row> configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
-               return new HiveTableSink(jobConf, new RowTypeInfo(fieldTypes, 
fieldNames), dbName, tableName, partitionColumns);
+               return new HiveTableSink(jobConf, tablePath, catalogTable);
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 1a1a312..e8c4342 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -177,7 +177,7 @@ public class HiveCatalog extends AbstractCatalog {
 
        @Override
        public Optional<TableFactory> getTableFactory() {
-               return Optional.of(new HiveTableFactory());
+               return Optional.of(new HiveTableFactory(hiveConf));
        }
 
        // ------ databases ------
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
index 6d6847a..4622322 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
@@ -81,18 +81,37 @@ public class HiveTableFactoryTest {
                properties.put("format.fields.1.name", "age");
                properties.put("format.fields.1.type", "INT");
 
-               CatalogTable table = new CatalogTableImpl(schema, properties, 
"csv table");
-
                catalog.createDatabase("mydb", new CatalogDatabaseImpl(new 
HashMap<>(), ""), true);
                ObjectPath path = new ObjectPath("mydb", "mytable");
+               CatalogTable table = new CatalogTableImpl(schema, properties, 
"csv table");
                catalog.createTable(path, table, true);
                Optional<TableFactory> opt = catalog.getTableFactory();
                assertTrue(opt.isPresent());
                HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
-               TableSource tableSource = tableFactory.createTableSource(table);
+               TableSource tableSource = tableFactory.createTableSource(path, 
table);
                assertTrue(tableSource instanceof StreamTableSource);
-               TableSink tableSink = tableFactory.createTableSink(table);
+               TableSink tableSink = tableFactory.createTableSink(path, table);
                assertTrue(tableSink instanceof StreamTableSink);
        }
 
+       @Test
+       public void testHiveTable() throws Exception {
+               TableSchema schema = TableSchema.builder()
+                       .field("name", DataTypes.STRING())
+                       .field("age", DataTypes.INT())
+                       .build();
+
+               Map<String, String> properties = new HashMap<>();
+
+               catalog.createDatabase("mydb", new CatalogDatabaseImpl(new 
HashMap<>(), ""), true);
+               ObjectPath path = new ObjectPath("mydb", "mytable");
+               CatalogTable table = new CatalogTableImpl(schema, properties, 
"hive table");
+               catalog.createTable(path, table, true);
+               Optional<TableFactory> opt = catalog.getTableFactory();
+               assertTrue(opt.isPresent());
+               HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
+               TableSink tableSink = tableFactory.createTableSink(path, table);
+               assertTrue(tableSink instanceof HiveTableSink);
+       }
+
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
index aa6c874..206f831 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.batch.connectors.hive;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
@@ -82,12 +82,13 @@ public class HiveTableOutputFormatTest {
        public void testInsertOverwrite() throws Exception {
                String dbName = "default";
                String tblName = "dest";
-               RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0);
+               createDestTable(dbName, tblName, 0);
                ObjectPath tablePath = new ObjectPath(dbName, tblName);
+               CatalogBaseTable table = hiveCatalog.getTable(tablePath);
                Table hiveTable = hiveCatalog.getHiveTable(tablePath);
 
                // write some data and verify
-               HiveTableOutputFormat outputFormat = 
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, null, false);
+               HiveTableOutputFormat outputFormat = 
createHiveTableOutputFormat(tablePath, (CatalogTableImpl) table, hiveTable, 
null, false);
                outputFormat.open(0, 1);
                List<Row> toWrite = generateRecords(5);
                writeRecords(toWrite, outputFormat);
@@ -96,7 +97,7 @@ public class HiveTableOutputFormatTest {
                verifyWrittenData(new Path(hiveTable.getSd().getLocation(), 
"0"), toWrite, 0);
 
                // write some data to overwrite existing data and verify
-               outputFormat = createHiveTableOutputFormat(tablePath, 
hiveTable, rowTypeInfo, null, true);
+               outputFormat = createHiveTableOutputFormat(tablePath, 
(CatalogTableImpl) table, hiveTable, null, true);
                outputFormat.open(0, 1);
                toWrite = generateRecords(3);
                writeRecords(toWrite, outputFormat);
@@ -111,13 +112,14 @@ public class HiveTableOutputFormatTest {
        public void testInsertIntoStaticPartition() throws Exception {
                String dbName = "default";
                String tblName = "dest";
-               RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
+               createDestTable(dbName, tblName, 1);
                ObjectPath tablePath = new ObjectPath(dbName, tblName);
                Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+               CatalogBaseTable table = hiveCatalog.getTable(tablePath);
 
                Map<String, Object> partSpec = new HashMap<>();
                partSpec.put("s", "a");
-               HiveTableOutputFormat outputFormat = 
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, partSpec, false);
+               HiveTableOutputFormat outputFormat = 
createHiveTableOutputFormat(tablePath, (CatalogTableImpl) table, hiveTable, 
partSpec, false);
                outputFormat.open(0, 1);
                List<Row> toWrite = generateRecords(1);
                writeRecords(toWrite, outputFormat);
@@ -133,7 +135,7 @@ public class HiveTableOutputFormatTest {
                hiveCatalog.dropTable(tablePath, false);
        }
 
-       private RowTypeInfo createDestTable(String dbName, String tblName, int 
numPartCols) throws Exception {
+       private void createDestTable(String dbName, String tblName, int 
numPartCols) throws Exception {
                ObjectPath tablePath = new ObjectPath(dbName, tblName);
                TableSchema tableSchema = new TableSchema(
                                new String[]{"i", "l", "d", "s"},
@@ -145,7 +147,6 @@ public class HiveTableOutputFormatTest {
                );
                CatalogTable catalogTable = createCatalogTable(tableSchema, 
numPartCols);
                hiveCatalog.createTable(tablePath, catalogTable, false);
-               return new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames());
        }
 
        private CatalogTable createCatalogTable(TableSchema tableSchema, int 
numPartCols) {
@@ -157,15 +158,14 @@ public class HiveTableOutputFormatTest {
                return new CatalogTableImpl(tableSchema, 
Arrays.asList(partCols), new HashMap<>(), "");
        }
 
-       private HiveTableOutputFormat createHiveTableOutputFormat(ObjectPath 
tablePath, Table hiveTable,
-                       RowTypeInfo rowTypeInfo, Map<String, Object> partSpec, 
boolean overwrite) throws Exception {
+       private HiveTableOutputFormat createHiveTableOutputFormat(ObjectPath 
tablePath, CatalogTable catalogTable, Table hiveTable,
+                       Map<String, Object> partSpec, boolean overwrite) throws 
Exception {
                StorageDescriptor jobSD = hiveTable.getSd().deepCopy();
                jobSD.setLocation(hiveTable.getSd().getLocation() + 
"/.staging");
                HiveTablePartition hiveTablePartition = new 
HiveTablePartition(jobSD, partSpec);
                JobConf jobConf = new JobConf(hiveConf);
-               return new HiveTableOutputFormat(jobConf, 
tablePath.getDatabaseName(), tablePath.getObjectName(),
-                               
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys()), rowTypeInfo, 
hiveTablePartition,
-                               MetaStoreUtils.getTableMetadata(hiveTable), 
overwrite);
+               return new HiveTableOutputFormat(jobConf, tablePath, 
catalogTable, hiveTablePartition,
+                       MetaStoreUtils.getTableMetadata(hiveTable), overwrite);
        }
 
        private void verifyWrittenData(Path outputFile, List<Row> expected, int 
numPartCols) throws Exception {
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index b8f68a4..6b25adb 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -91,8 +91,8 @@ public class HiveTableSinkTest {
                tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, 
rowTypeInfo));
 
                Table hiveTable = hiveCatalog.getHiveTable(tablePath);
-               tableEnv.registerTableSink("destSink", new HiveTableSink(new 
JobConf(hiveConf), rowTypeInfo,
-                               "default", "dest", 
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys())));
+               CatalogTable table = (CatalogTable) 
hiveCatalog.getTable(tablePath);
+               tableEnv.registerTableSink("destSink", new HiveTableSink(new 
JobConf(hiveConf), tablePath, table));
                tableEnv.sqlQuery("select * from src").insertInto("destSink");
                execEnv.execute();
 
@@ -113,8 +113,8 @@ public class HiveTableSinkTest {
                tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, 
rowTypeInfo));
 
                Table hiveTable = hiveCatalog.getHiveTable(tablePath);
-               tableEnv.registerTableSink("destSink", new HiveTableSink(new 
JobConf(hiveConf), rowTypeInfo,
-                               "default", "dest", 
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys())));
+               CatalogTable table = (CatalogTable) 
hiveCatalog.getTable(tablePath);
+               tableEnv.registerTableSink("destSink", new HiveTableSink(new 
JobConf(hiveConf), tablePath, table));
                tableEnv.sqlQuery("select * from src").insertInto("destSink");
                execEnv.execute();
 
@@ -162,8 +162,8 @@ public class HiveTableSinkTest {
                tableEnv.registerDataSet("complexSrc", 
execEnv.fromCollection(toWrite, rowTypeInfo));
 
                Table hiveTable = hiveCatalog.getHiveTable(tablePath);
-               tableEnv.registerTableSink("complexSink", new HiveTableSink(new 
JobConf(hiveConf), rowTypeInfo,
-                               "default", "dest", 
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys())));
+               CatalogTable catalogTable = (CatalogTable) 
hiveCatalog.getTable(tablePath);
+               tableEnv.registerTableSink("complexSink", new HiveTableSink(new 
JobConf(hiveConf), tablePath, catalogTable));
                tableEnv.sqlQuery("select * from 
complexSrc").insertInto("complexSink");
                execEnv.execute();
 
@@ -190,8 +190,8 @@ public class HiveTableSinkTest {
 
                tableEnv.registerDataSet("nestedSrc", 
execEnv.fromCollection(toWrite, rowTypeInfo));
                hiveTable = hiveCatalog.getHiveTable(tablePath);
-               tableEnv.registerTableSink("nestedSink", new HiveTableSink(new 
JobConf(hiveConf), rowTypeInfo,
-                               "default", "dest", 
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys())));
+               catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
+               tableEnv.registerTableSink("nestedSink", new HiveTableSink(new 
JobConf(hiveConf), tablePath, catalogTable));
                tableEnv.sqlQuery("select * from 
nestedSrc").insertInto("nestedSink");
                execEnv.execute();
 
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 950115c..e401223 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -158,14 +158,6 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
        }
 
        @Override
-       public CatalogTable createStreamingTable() {
-               return new CatalogTableImpl(
-                       createTableSchema(),
-                       getStreamingTableProperties(),
-                       TEST_COMMENT);
-       }
-
-       @Override
        public CatalogPartition createPartition() {
                return new GenericCatalogPartition(getBatchTableProperties(), 
"Generic batch table");
        }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectPath.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectPath.java
index d931651..d096e26 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectPath.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectPath.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.util.StringUtils;
 
+import java.io.Serializable;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -27,7 +28,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 /**
  * A database name and object (table/view/function) name combo in a catalog.
  */
-public class ObjectPath {
+public class ObjectPath implements Serializable {
        private final String databaseName;
        private final String objectName;
 
@@ -89,4 +90,5 @@ public class ObjectPath {
        public String toString() {
                return String.format("%s.%s", databaseName, objectName);
        }
+
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
index 7f79b77..05d0f34 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.sinks.TableSink;
 
 import java.util.Map;
@@ -44,10 +45,11 @@ public interface TableSinkFactory<T> extends TableFactory {
        /**
         * Creates and configures a {@link TableSink} based on the given {@link 
CatalogTable} instance.
         *
+        * @param tablePath path of the given {@link CatalogTable}
         * @param table {@link CatalogTable} instance.
         * @return the configured table sink.
         */
-       default TableSink<T> createTableSink(CatalogTable table) {
+       default TableSink<T> createTableSink(ObjectPath tablePath, CatalogTable 
table) {
                return createTableSink(table.toProperties());
        }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
index 0cf150e..c0f97d9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.sources.TableSource;
 
 import java.util.Map;
@@ -44,10 +45,11 @@ public interface TableSourceFactory<T> extends TableFactory 
{
        /**
         * Creates and configures a {@link TableSource} based on the given 
{@link CatalogTable} instance.
         *
+        * @param tablePath path of the given {@link CatalogTable}
         * @param table {@link CatalogTable} instance.
         * @return the configured table source.
         */
-       default TableSource<T> createTableSource(CatalogTable table) {
+       default TableSource<T> createTableSource(ObjectPath tablePath, 
CatalogTable table) {
                return createTableSource(table.toProperties());
        }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 9fc3282..e629978 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -79,7 +79,7 @@ class DatabaseCalciteSchema implements Schema {
                        } else if (table instanceof ConnectorCatalogTable) {
                                return 
convertConnectorTable((ConnectorCatalogTable<?, ?>) table);
                        } else if (table instanceof CatalogTable) {
-                               return convertCatalogTable((CatalogTable) 
table);
+                               return convertCatalogTable(tablePath, 
(CatalogTable) table);
                        } else {
                                throw new TableException("Unsupported table 
type: " + table);
                        }
@@ -103,9 +103,9 @@ class DatabaseCalciteSchema implements Schema {
                        .orElseThrow(() -> new TableException("Cannot query a 
sink only table."));
        }
 
-       private Table convertCatalogTable(CatalogTable table) {
+       private Table convertCatalogTable(ObjectPath tablePath, CatalogTable 
table) {
                Optional<TableFactory> tableFactory = catalog.getTableFactory();
-               TableSource<Row> tableSource = tableFactory.map(tf -> 
((TableSourceFactory) tf).createTableSource(table))
+               TableSource<Row> tableSource = tableFactory.map(tf -> 
((TableSourceFactory) tf).createTableSource(tablePath, table))
                        
.orElse(TableFactoryUtil.findAndCreateTableSource(table));
 
                if (!(tableSource instanceof StreamTableSource)) {

Reply via email to