This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f172129 [FLINK-12989][hive]: Generate HiveTableSink from from a Hive
table
f172129 is described below
commit f1721293b0701d584d42bd68671181e332d2ad04
Author: Xuefu Zhang <[email protected]>
AuthorDate: Wed Jun 26 11:56:11 2019 -0700
[FLINK-12989][hive]: Generate HiveTableSink from from a Hive table
This PR adds the generation of HiveTableSink from from a Hive table.
This closes #8890.
---
.../batch/connectors/hive/HiveTableFactory.java | 26 +++++++++++-----
.../connectors/hive/HiveTableOutputFormat.java | 36 ++++++++++------------
.../flink/batch/connectors/hive/HiveTableSink.java | 33 ++++++++++++--------
.../flink/table/catalog/hive/HiveCatalog.java | 2 +-
.../connectors/hive/HiveTableFactoryTest.java | 27 +++++++++++++---
.../connectors/hive/HiveTableOutputFormatTest.java | 26 ++++++++--------
.../batch/connectors/hive/HiveTableSinkTest.java | 16 +++++-----
.../table/catalog/GenericInMemoryCatalogTest.java | 8 -----
.../org/apache/flink/table/catalog/ObjectPath.java | 4 ++-
.../flink/table/factories/TableSinkFactory.java | 4 ++-
.../flink/table/factories/TableSourceFactory.java | 4 ++-
.../flink/table/catalog/DatabaseCalciteSchema.java | 6 ++--
12 files changed, 113 insertions(+), 79 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
index f3c25ec..fd142da 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.batch.connectors.hive;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.factories.TableSinkFactory;
@@ -31,13 +32,23 @@ import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobConf;
+
import java.util.List;
import java.util.Map;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A table factory implementation for tables stored in Hive catalog.
*/
public class HiveTableFactory implements TableSourceFactory<Row>,
TableSinkFactory<Row> {
+ private HiveConf hiveConf;
+
+ public HiveTableFactory(HiveConf hiveConf) {
+ this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be
null");
+ }
@Override
public Map<String, String> requiredContext() {
@@ -60,14 +71,14 @@ public class HiveTableFactory implements
TableSourceFactory<Row>, TableSinkFacto
}
@Override
- public TableSource<Row> createTableSource(CatalogTable table) {
+ public TableSource<Row> createTableSource(ObjectPath tablePath,
CatalogTable table) {
Preconditions.checkNotNull(table);
Preconditions.checkArgument(table instanceof CatalogTableImpl);
boolean isGeneric =
Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC));
if (!isGeneric) {
- return createInputFormatTableSource(table);
+ return createInputFormatTableSource(tablePath, table);
} else {
return TableFactoryUtil.findAndCreateTableSource(table);
}
@@ -76,20 +87,20 @@ public class HiveTableFactory implements
TableSourceFactory<Row>, TableSinkFacto
/**
* Creates and configures a {@link
org.apache.flink.table.sources.InputFormatTableSource} using the given {@link
CatalogTable}.
*/
- private InputFormatTableSource<Row>
createInputFormatTableSource(CatalogTable table) {
+ private InputFormatTableSource<Row>
createInputFormatTableSource(ObjectPath tablePath, CatalogTable table) {
// TODO: create an InputFormatTableSource from a
HiveCatalogTable instance.
return null;
}
@Override
- public TableSink<Row> createTableSink(CatalogTable table) {
+ public TableSink<Row> createTableSink(ObjectPath tablePath,
CatalogTable table) {
Preconditions.checkNotNull(table);
Preconditions.checkArgument(table instanceof CatalogTableImpl);
boolean isGeneric =
Boolean.valueOf(table.getProperties().get(CatalogConfig.IS_GENERIC));
if (!isGeneric) {
- return createOutputFormatTableSink(table);
+ return createOutputFormatTableSink(tablePath,
(CatalogTableImpl) table);
} else {
return TableFactoryUtil.findAndCreateTableSink(table);
}
@@ -98,9 +109,8 @@ public class HiveTableFactory implements
TableSourceFactory<Row>, TableSinkFacto
/**
* Creates and configures a {@link
org.apache.flink.table.sinks.OutputFormatTableSink} using the given {@link
CatalogTable}.
*/
- private OutputFormatTableSink<Row>
createOutputFormatTableSink(CatalogTable table) {
- // TODO: create an outputFormatTableSink from a
HiveCatalogTable instance.
- return null;
+ private OutputFormatTableSink<Row>
createOutputFormatTableSink(ObjectPath tablePath, CatalogTableImpl table) {
+ return new HiveTableSink(new JobConf(hiveConf), tablePath,
table);
}
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index a0fdf56..898ab53 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -25,6 +25,9 @@ import
org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
@@ -108,8 +111,7 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
private static final long serialVersionUID = 5167529504848109023L;
private transient JobConf jobConf;
- private transient String databaseName;
- private transient String tableName;
+ private transient ObjectPath tablePath;
private transient List<String> partitionColumns;
private transient RowTypeInfo rowTypeInfo;
private transient HiveTablePartition hiveTablePartition;
@@ -139,23 +141,20 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
// to convert Flink object to Hive object
private transient HiveObjectConversion[] hiveConversions;
- public HiveTableOutputFormat(JobConf jobConf, String databaseName,
String tableName, List<String> partitionColumns,
- RowTypeInfo
rowTypeInfo, HiveTablePartition hiveTablePartition,
+ public HiveTableOutputFormat(JobConf jobConf, ObjectPath tablePath,
CatalogTable table, HiveTablePartition hiveTablePartition,
Properties
tableProperties, boolean overwrite) {
super(jobConf.getCredentials());
-
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName),
"DB name is empty");
-
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(tableName),
"Table name is empty");
- Preconditions.checkNotNull(rowTypeInfo, "RowTypeInfo cannot be
null");
+ Preconditions.checkNotNull(table, "table cannot be null");
Preconditions.checkNotNull(hiveTablePartition,
"HiveTablePartition cannot be null");
Preconditions.checkNotNull(tableProperties, "Table properties
cannot be null");
HadoopUtils.mergeHadoopConf(jobConf);
this.jobConf = jobConf;
- this.databaseName = databaseName;
- this.tableName = tableName;
- this.partitionColumns = partitionColumns;
- this.rowTypeInfo = rowTypeInfo;
+ this.tablePath = tablePath;
+ this.partitionColumns = table.getPartitionKeys();
+ TableSchema tableSchema = table.getSchema();
+ this.rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(),
tableSchema.getFieldNames());
this.hiveTablePartition = hiveTablePartition;
this.tableProperties = tableProperties;
this.overwrite = overwrite;
@@ -174,8 +173,7 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
out.writeObject(rowTypeInfo);
out.writeObject(hiveTablePartition);
out.writeObject(partitionColumns);
- out.writeObject(databaseName);
- out.writeObject(tableName);
+ out.writeObject(tablePath);
out.writeObject(tableProperties);
}
@@ -197,8 +195,7 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
rowTypeInfo = (RowTypeInfo) in.readObject();
hiveTablePartition = (HiveTablePartition) in.readObject();
partitionColumns = (List<String>) in.readObject();
- databaseName = (String) in.readObject();
- tableName = (String) in.readObject();
+ tablePath = (ObjectPath) in.readObject();
partitionToWriter = new HashMap<>();
tableProperties = (Properties) in.readObject();
}
@@ -209,7 +206,7 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
Path stagingDir = new Path(jobSD.getLocation());
FileSystem fs = stagingDir.getFileSystem(jobConf);
try (HiveMetastoreClientWrapper client =
HiveMetastoreClientFactory.create(new HiveConf(jobConf, HiveConf.class),
hiveVersion)) {
- Table table = client.getTable(databaseName, tableName);
+ Table table =
client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
if (!isDynamicPartition) {
commitJob(stagingDir.toString());
}
@@ -336,8 +333,9 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
private void loadPartition(Path srcDir, Table table, Map<String,
String> partSpec, HiveMetastoreClientWrapper client)
throws TException, IOException {
Path tblLocation = new Path(table.getSd().getLocation());
- List<Partition> existingPart =
client.listPartitions(databaseName, tableName,
- new ArrayList<>(partSpec.values()), (short) 1);
+ String dbName = tablePath.getDatabaseName();
+ String tableName = tablePath.getObjectName();
+ List<Partition> existingPart = client.listPartitions(dbName,
tableName, new ArrayList<>(partSpec.values()), (short) 1);
Path destDir = existingPart.isEmpty() ? new Path(tblLocation,
Warehouse.makePartPath(partSpec)) :
new
Path(existingPart.get(0).getSd().getLocation());
moveFiles(srcDir, destDir);
@@ -345,7 +343,7 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
if (existingPart.isEmpty()) {
StorageDescriptor sd = new
StorageDescriptor(hiveTablePartition.getStorageDescriptor());
sd.setLocation(destDir.toString());
- Partition partition =
HiveTableUtil.createHivePartition(databaseName, tableName,
+ Partition partition =
HiveTableUtil.createHivePartition(dbName, tableName,
new ArrayList<>(partSpec.values()), sd,
new HashMap<>());
partition.setValues(new ArrayList<>(partSpec.values()));
client.add_partition(partition);
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
index 7b318cc..ab077d5 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
@@ -21,6 +21,9 @@ package org.apache.flink.batch.connectors.hive;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
@@ -55,30 +58,31 @@ import java.util.List;
public class HiveTableSink extends OutputFormatTableSink<Row> {
private final JobConf jobConf;
+ private final CatalogTable catalogTable;
+ private final ObjectPath tablePath;
private final RowTypeInfo rowTypeInfo;
- private final String dbName;
- private final String tableName;
- private final List<String> partitionColumns;
private final String hiveVersion;
// TODO: need OverwritableTableSink to configure this
private boolean overwrite = false;
- public HiveTableSink(JobConf jobConf, RowTypeInfo rowTypeInfo, String
dbName, String tableName,
- List<String> partitionColumns) {
+ public HiveTableSink(JobConf jobConf, ObjectPath tablePath,
CatalogTable table) {
this.jobConf = jobConf;
- this.rowTypeInfo = rowTypeInfo;
- this.dbName = dbName;
- this.tableName = tableName;
- this.partitionColumns = partitionColumns;
+ this.tablePath = tablePath;
+ this.catalogTable = table;
hiveVersion =
jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION,
HiveShimLoader.getHiveVersion());
+ TableSchema tableSchema = table.getSchema();
+ rowTypeInfo = new RowTypeInfo(tableSchema.getFieldTypes(),
tableSchema.getFieldNames());
}
@Override
public OutputFormat<Row> getOutputFormat() {
+ List<String> partitionColumns = catalogTable.getPartitionKeys();
boolean isPartitioned = partitionColumns != null &&
!partitionColumns.isEmpty();
// TODO: need PartitionableTableSink to decide whether it's
dynamic partitioning
boolean isDynamicPartition = isPartitioned;
+ String dbName = tablePath.getDatabaseName();
+ String tableName = tablePath.getObjectName();
try (HiveMetastoreClientWrapper client =
HiveMetastoreClientFactory.create(new HiveConf(jobConf, HiveConf.class),
hiveVersion)) {
Table table = client.getTable(dbName, tableName);
StorageDescriptor sd = table.getSd();
@@ -109,8 +113,13 @@ public class HiveTableSink extends
OutputFormatTableSink<Row> {
sd.setLocation(toStagingDir(sdLocation,
jobConf));
hiveTablePartition = new HiveTablePartition(sd,
null);
}
- return new HiveTableOutputFormat(jobConf, dbName,
tableName,
- partitionColumns, rowTypeInfo,
hiveTablePartition, MetaStoreUtils.getTableMetadata(table), overwrite);
+ return new HiveTableOutputFormat(
+ jobConf,
+ tablePath,
+ catalogTable,
+ hiveTablePartition,
+ MetaStoreUtils.getTableMetadata(table),
+ overwrite);
} catch (TException e) {
throw new CatalogException("Failed to query Hive
metaStore", e);
} catch (IOException e) {
@@ -120,7 +129,7 @@ public class HiveTableSink extends
OutputFormatTableSink<Row> {
@Override
public TableSink<Row> configure(String[] fieldNames,
TypeInformation<?>[] fieldTypes) {
- return new HiveTableSink(jobConf, new RowTypeInfo(fieldTypes,
fieldNames), dbName, tableName, partitionColumns);
+ return new HiveTableSink(jobConf, tablePath, catalogTable);
}
@Override
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 1a1a312..e8c4342 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -177,7 +177,7 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public Optional<TableFactory> getTableFactory() {
- return Optional.of(new HiveTableFactory());
+ return Optional.of(new HiveTableFactory(hiveConf));
}
// ------ databases ------
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
index 6d6847a..4622322 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableFactoryTest.java
@@ -81,18 +81,37 @@ public class HiveTableFactoryTest {
properties.put("format.fields.1.name", "age");
properties.put("format.fields.1.type", "INT");
- CatalogTable table = new CatalogTableImpl(schema, properties,
"csv table");
-
catalog.createDatabase("mydb", new CatalogDatabaseImpl(new
HashMap<>(), ""), true);
ObjectPath path = new ObjectPath("mydb", "mytable");
+ CatalogTable table = new CatalogTableImpl(schema, properties,
"csv table");
catalog.createTable(path, table, true);
Optional<TableFactory> opt = catalog.getTableFactory();
assertTrue(opt.isPresent());
HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
- TableSource tableSource = tableFactory.createTableSource(table);
+ TableSource tableSource = tableFactory.createTableSource(path,
table);
assertTrue(tableSource instanceof StreamTableSource);
- TableSink tableSink = tableFactory.createTableSink(table);
+ TableSink tableSink = tableFactory.createTableSink(path, table);
assertTrue(tableSink instanceof StreamTableSink);
}
+ @Test
+ public void testHiveTable() throws Exception {
+ TableSchema schema = TableSchema.builder()
+ .field("name", DataTypes.STRING())
+ .field("age", DataTypes.INT())
+ .build();
+
+ Map<String, String> properties = new HashMap<>();
+
+ catalog.createDatabase("mydb", new CatalogDatabaseImpl(new
HashMap<>(), ""), true);
+ ObjectPath path = new ObjectPath("mydb", "mytable");
+ CatalogTable table = new CatalogTableImpl(schema, properties,
"hive table");
+ catalog.createTable(path, table, true);
+ Optional<TableFactory> opt = catalog.getTableFactory();
+ assertTrue(opt.isPresent());
+ HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
+ TableSink tableSink = tableFactory.createTableSink(path, table);
+ assertTrue(tableSink instanceof HiveTableSink);
+ }
+
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
index aa6c874..206f831 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.batch.connectors.hive;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
@@ -82,12 +82,13 @@ public class HiveTableOutputFormatTest {
public void testInsertOverwrite() throws Exception {
String dbName = "default";
String tblName = "dest";
- RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0);
+ createDestTable(dbName, tblName, 0);
ObjectPath tablePath = new ObjectPath(dbName, tblName);
+ CatalogBaseTable table = hiveCatalog.getTable(tablePath);
Table hiveTable = hiveCatalog.getHiveTable(tablePath);
// write some data and verify
- HiveTableOutputFormat outputFormat =
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, null, false);
+ HiveTableOutputFormat outputFormat =
createHiveTableOutputFormat(tablePath, (CatalogTableImpl) table, hiveTable,
null, false);
outputFormat.open(0, 1);
List<Row> toWrite = generateRecords(5);
writeRecords(toWrite, outputFormat);
@@ -96,7 +97,7 @@ public class HiveTableOutputFormatTest {
verifyWrittenData(new Path(hiveTable.getSd().getLocation(),
"0"), toWrite, 0);
// write some data to overwrite existing data and verify
- outputFormat = createHiveTableOutputFormat(tablePath,
hiveTable, rowTypeInfo, null, true);
+ outputFormat = createHiveTableOutputFormat(tablePath,
(CatalogTableImpl) table, hiveTable, null, true);
outputFormat.open(0, 1);
toWrite = generateRecords(3);
writeRecords(toWrite, outputFormat);
@@ -111,13 +112,14 @@ public class HiveTableOutputFormatTest {
public void testInsertIntoStaticPartition() throws Exception {
String dbName = "default";
String tblName = "dest";
- RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
+ createDestTable(dbName, tblName, 1);
ObjectPath tablePath = new ObjectPath(dbName, tblName);
Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+ CatalogBaseTable table = hiveCatalog.getTable(tablePath);
Map<String, Object> partSpec = new HashMap<>();
partSpec.put("s", "a");
- HiveTableOutputFormat outputFormat =
createHiveTableOutputFormat(tablePath, hiveTable, rowTypeInfo, partSpec, false);
+ HiveTableOutputFormat outputFormat =
createHiveTableOutputFormat(tablePath, (CatalogTableImpl) table, hiveTable,
partSpec, false);
outputFormat.open(0, 1);
List<Row> toWrite = generateRecords(1);
writeRecords(toWrite, outputFormat);
@@ -133,7 +135,7 @@ public class HiveTableOutputFormatTest {
hiveCatalog.dropTable(tablePath, false);
}
- private RowTypeInfo createDestTable(String dbName, String tblName, int
numPartCols) throws Exception {
+ private void createDestTable(String dbName, String tblName, int
numPartCols) throws Exception {
ObjectPath tablePath = new ObjectPath(dbName, tblName);
TableSchema tableSchema = new TableSchema(
new String[]{"i", "l", "d", "s"},
@@ -145,7 +147,6 @@ public class HiveTableOutputFormatTest {
);
CatalogTable catalogTable = createCatalogTable(tableSchema,
numPartCols);
hiveCatalog.createTable(tablePath, catalogTable, false);
- return new RowTypeInfo(tableSchema.getFieldTypes(),
tableSchema.getFieldNames());
}
private CatalogTable createCatalogTable(TableSchema tableSchema, int
numPartCols) {
@@ -157,15 +158,14 @@ public class HiveTableOutputFormatTest {
return new CatalogTableImpl(tableSchema,
Arrays.asList(partCols), new HashMap<>(), "");
}
- private HiveTableOutputFormat createHiveTableOutputFormat(ObjectPath
tablePath, Table hiveTable,
- RowTypeInfo rowTypeInfo, Map<String, Object> partSpec,
boolean overwrite) throws Exception {
+ private HiveTableOutputFormat createHiveTableOutputFormat(ObjectPath
tablePath, CatalogTable catalogTable, Table hiveTable,
+ Map<String, Object> partSpec, boolean overwrite) throws
Exception {
StorageDescriptor jobSD = hiveTable.getSd().deepCopy();
jobSD.setLocation(hiveTable.getSd().getLocation() +
"/.staging");
HiveTablePartition hiveTablePartition = new
HiveTablePartition(jobSD, partSpec);
JobConf jobConf = new JobConf(hiveConf);
- return new HiveTableOutputFormat(jobConf,
tablePath.getDatabaseName(), tablePath.getObjectName(),
-
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys()), rowTypeInfo,
hiveTablePartition,
- MetaStoreUtils.getTableMetadata(hiveTable),
overwrite);
+ return new HiveTableOutputFormat(jobConf, tablePath,
catalogTable, hiveTablePartition,
+ MetaStoreUtils.getTableMetadata(hiveTable), overwrite);
}
private void verifyWrittenData(Path outputFile, List<Row> expected, int
numPartCols) throws Exception {
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index b8f68a4..6b25adb 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -91,8 +91,8 @@ public class HiveTableSinkTest {
tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite,
rowTypeInfo));
Table hiveTable = hiveCatalog.getHiveTable(tablePath);
- tableEnv.registerTableSink("destSink", new HiveTableSink(new
JobConf(hiveConf), rowTypeInfo,
- "default", "dest",
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys())));
+ CatalogTable table = (CatalogTable)
hiveCatalog.getTable(tablePath);
+ tableEnv.registerTableSink("destSink", new HiveTableSink(new
JobConf(hiveConf), tablePath, table));
tableEnv.sqlQuery("select * from src").insertInto("destSink");
execEnv.execute();
@@ -113,8 +113,8 @@ public class HiveTableSinkTest {
tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite,
rowTypeInfo));
Table hiveTable = hiveCatalog.getHiveTable(tablePath);
- tableEnv.registerTableSink("destSink", new HiveTableSink(new
JobConf(hiveConf), rowTypeInfo,
- "default", "dest",
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys())));
+ CatalogTable table = (CatalogTable)
hiveCatalog.getTable(tablePath);
+ tableEnv.registerTableSink("destSink", new HiveTableSink(new
JobConf(hiveConf), tablePath, table));
tableEnv.sqlQuery("select * from src").insertInto("destSink");
execEnv.execute();
@@ -162,8 +162,8 @@ public class HiveTableSinkTest {
tableEnv.registerDataSet("complexSrc",
execEnv.fromCollection(toWrite, rowTypeInfo));
Table hiveTable = hiveCatalog.getHiveTable(tablePath);
- tableEnv.registerTableSink("complexSink", new HiveTableSink(new
JobConf(hiveConf), rowTypeInfo,
- "default", "dest",
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys())));
+ CatalogTable catalogTable = (CatalogTable)
hiveCatalog.getTable(tablePath);
+ tableEnv.registerTableSink("complexSink", new HiveTableSink(new
JobConf(hiveConf), tablePath, catalogTable));
tableEnv.sqlQuery("select * from
complexSrc").insertInto("complexSink");
execEnv.execute();
@@ -190,8 +190,8 @@ public class HiveTableSinkTest {
tableEnv.registerDataSet("nestedSrc",
execEnv.fromCollection(toWrite, rowTypeInfo));
hiveTable = hiveCatalog.getHiveTable(tablePath);
- tableEnv.registerTableSink("nestedSink", new HiveTableSink(new
JobConf(hiveConf), rowTypeInfo,
- "default", "dest",
HiveCatalog.getFieldNames(hiveTable.getPartitionKeys())));
+ catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
+ tableEnv.registerTableSink("nestedSink", new HiveTableSink(new
JobConf(hiveConf), tablePath, catalogTable));
tableEnv.sqlQuery("select * from
nestedSrc").insertInto("nestedSink");
execEnv.execute();
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 950115c..e401223 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -158,14 +158,6 @@ public class GenericInMemoryCatalogTest extends
CatalogTestBase {
}
@Override
- public CatalogTable createStreamingTable() {
- return new CatalogTableImpl(
- createTableSchema(),
- getStreamingTableProperties(),
- TEST_COMMENT);
- }
-
- @Override
public CatalogPartition createPartition() {
return new GenericCatalogPartition(getBatchTableProperties(),
"Generic batch table");
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectPath.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectPath.java
index d931651..d096e26 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectPath.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ObjectPath.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog;
import org.apache.flink.util.StringUtils;
+import java.io.Serializable;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -27,7 +28,7 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
/**
* A database name and object (table/view/function) name combo in a catalog.
*/
-public class ObjectPath {
+public class ObjectPath implements Serializable {
private final String databaseName;
private final String objectName;
@@ -89,4 +90,5 @@ public class ObjectPath {
public String toString() {
return String.format("%s.%s", databaseName, objectName);
}
+
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
index 7f79b77..05d0f34 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSinkFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.factories;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.sinks.TableSink;
import java.util.Map;
@@ -44,10 +45,11 @@ public interface TableSinkFactory<T> extends TableFactory {
/**
* Creates and configures a {@link TableSink} based on the given {@link
CatalogTable} instance.
*
+ * @param tablePath path of the given {@link CatalogTable}
* @param table {@link CatalogTable} instance.
* @return the configured table sink.
*/
- default TableSink<T> createTableSink(CatalogTable table) {
+ default TableSink<T> createTableSink(ObjectPath tablePath, CatalogTable
table) {
return createTableSink(table.toProperties());
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
index 0cf150e..c0f97d9 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableSourceFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.factories;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.sources.TableSource;
import java.util.Map;
@@ -44,10 +45,11 @@ public interface TableSourceFactory<T> extends TableFactory
{
/**
* Creates and configures a {@link TableSource} based on the given
{@link CatalogTable} instance.
*
+ * @param tablePath path of the given {@link CatalogTable}
* @param table {@link CatalogTable} instance.
* @return the configured table source.
*/
- default TableSource<T> createTableSource(CatalogTable table) {
+ default TableSource<T> createTableSource(ObjectPath tablePath,
CatalogTable table) {
return createTableSource(table.toProperties());
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 9fc3282..e629978 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -79,7 +79,7 @@ class DatabaseCalciteSchema implements Schema {
} else if (table instanceof ConnectorCatalogTable) {
return
convertConnectorTable((ConnectorCatalogTable<?, ?>) table);
} else if (table instanceof CatalogTable) {
- return convertCatalogTable((CatalogTable)
table);
+ return convertCatalogTable(tablePath,
(CatalogTable) table);
} else {
throw new TableException("Unsupported table
type: " + table);
}
@@ -103,9 +103,9 @@ class DatabaseCalciteSchema implements Schema {
.orElseThrow(() -> new TableException("Cannot query a
sink only table."));
}
- private Table convertCatalogTable(CatalogTable table) {
+ private Table convertCatalogTable(ObjectPath tablePath, CatalogTable
table) {
Optional<TableFactory> tableFactory = catalog.getTableFactory();
- TableSource<Row> tableSource = tableFactory.map(tf ->
((TableSourceFactory) tf).createTableSource(table))
+ TableSource<Row> tableSource = tableFactory.map(tf ->
((TableSourceFactory) tf).createTableSource(tablePath, table))
.orElse(TableFactoryUtil.findAndCreateTableSource(table));
if (!(tableSource instanceof StreamTableSource)) {