This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f919b23 [FLINK-12233][hive] Support table related operations in
HiveCatalog
f919b23 is described below
commit f919b23890913aa8d3f6241a1d70ef82006863fc
Author: bowen.li <[email protected]>
AuthorDate: Tue May 7 11:52:17 2019 -0700
[FLINK-12233][hive] Support table related operations in HiveCatalog
This PR introduced HiveCatalogTable and implemented table related catalog
APIs in HiveCatalog.
This closes #8353.
---
.../catalog/hive/GenericHiveMetastoreCatalog.java | 238 ++++++++++-----------
.../flink/table/catalog/hive/HiveCatalog.java | 110 +++++++---
.../flink/table/catalog/hive/HiveCatalogBase.java | 190 ++++++++++++++++
.../flink/table/catalog/hive/HiveCatalogTable.java | 105 +++++++++
.../flink/table/catalog/hive/HiveCatalogUtil.java | 1 -
.../flink/table/catalog/hive/HiveTableConfig.java | 4 +-
.../hive/util/GenericHiveMetastoreCatalogUtil.java | 171 ---------------
.../HiveTableUtil.java} | 31 ++-
.../hive/GenericHiveMetastoreCatalogTest.java | 3 +-
.../flink/table/catalog/hive/HiveCatalogTest.java | 110 ++++------
.../flink/table/catalog/hive/HiveTestUtils.java | 7 +
.../table/catalog/GenericInMemoryCatalogTest.java | 4 +-
.../flink/table/catalog/CatalogTestBase.java | 33 ++-
.../flink/table/catalog/CatalogTestUtil.java | 13 +-
14 files changed, 594 insertions(+), 426 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index 843b4a2..c6d14f4 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -18,12 +18,17 @@
package org.apache.flink.table.catalog.hive;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
@@ -33,25 +38,28 @@ import
org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import
org.apache.flink.table.catalog.hive.util.GenericHiveMetastoreCatalogUtil;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
/**
* A catalog that persists all Flink streaming and batch metadata by using
Hive metastore as a persistent storage.
@@ -59,6 +67,10 @@ import java.util.List;
public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
private static final Logger LOG =
LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class);
+ // Prefix used to distinguish properties created by Hive and Flink,
+ // as Hive metastore has its own properties created upon table creation
and migration between different versions of metastore.
+ private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
public GenericHiveMetastoreCatalog(String catalogName, String
hivemetastoreURI) {
super(catalogName, hivemetastoreURI);
@@ -95,144 +107,103 @@ public class GenericHiveMetastoreCatalog extends
HiveCatalogBase {
// ------ tables and views------
@Override
- public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
- throws TableNotExistException, CatalogException {
- try {
- client.dropTable(
- tablePath.getDatabaseName(),
- tablePath.getObjectName(),
- // Indicate whether associated data should be
deleted.
- // Set to 'true' for now because Flink tables
shouldn't have data in Hive. Can be changed later if necessary
- true,
- ignoreIfNotExists);
- } catch (NoSuchObjectException e) {
- if (!ignoreIfNotExists) {
- throw new TableNotExistException(catalogName,
tablePath);
- }
- } catch (TException e) {
+ protected void validateCatalogBaseTable(CatalogBaseTable table)
+ throws CatalogException {
+ if (!(table instanceof GenericCatalogTable) && !(table
instanceof GenericCatalogView)) {
throw new CatalogException(
- String.format("Failed to drop table %s",
tablePath.getFullName()), e);
+ "GenericHiveMetastoreCatalog can only operate
on GenericCatalogTable and GenericCatalogView.");
}
}
@Override
- public void renameTable(ObjectPath tablePath, String newTableName,
boolean ignoreIfNotExists)
- throws TableNotExistException,
TableAlreadyExistException, CatalogException {
- try {
- // alter_table() doesn't throw a clear exception when
target table doesn't exist. Thus, check the table existence explicitly
- if (tableExists(tablePath)) {
- ObjectPath newPath = new
ObjectPath(tablePath.getDatabaseName(), newTableName);
- // alter_table() doesn't throw a clear
exception when new table already exists. Thus, check the table existence
explicitly
- if (tableExists(newPath)) {
- throw new
TableAlreadyExistException(catalogName, newPath);
- } else {
- Table table = getHiveTable(tablePath);
- table.setTableName(newTableName);
-
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(),
table);
- }
- } else if (!ignoreIfNotExists) {
- throw new TableNotExistException(catalogName,
tablePath);
- }
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to rename table %s",
tablePath.getFullName()), e);
- }
- }
+ protected CatalogBaseTable createCatalogBaseTable(Table hiveTable) {
+ // Table schema
+ TableSchema tableSchema = HiveTableUtil.createTableSchema(
+ hiveTable.getSd().getCols(),
hiveTable.getPartitionKeys());
- @Override
- public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
- throws TableAlreadyExistException,
DatabaseNotExistException, CatalogException {
- if (!databaseExists(tablePath.getDatabaseName())) {
- throw new DatabaseNotExistException(catalogName,
tablePath.getDatabaseName());
- } else {
- try {
-
client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath,
table));
- } catch (AlreadyExistsException e) {
- if (!ignoreIfExists) {
- throw new
TableAlreadyExistException(catalogName, tablePath);
- }
- } catch (TException e) {
- throw new
CatalogException(String.format("Failed to create table %s",
tablePath.getFullName()), e);
- }
+ // Table properties
+ Map<String, String> properties =
retrieveFlinkProperties(hiveTable.getParameters());
+
+ // Table comment
+ String comment =
properties.remove(HiveTableConfig.TABLE_COMMENT);
+
+ // Partition keys
+ List<String> partitionKeys = new ArrayList<>();
+
+ if (!hiveTable.getPartitionKeys().isEmpty()) {
+ partitionKeys = hiveTable.getPartitionKeys().stream()
+ .map(fs -> fs.getName())
+ .collect(Collectors.toList());
}
- }
- @Override
- public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
- throws TableNotExistException, CatalogException {
- if (!tableExists(tablePath)) {
- if (!ignoreIfNotExists) {
- throw new TableNotExistException(catalogName,
tablePath);
- }
+ if (TableType.valueOf(hiveTable.getTableType()) ==
TableType.VIRTUAL_VIEW) {
+ return new GenericCatalogView(
+ hiveTable.getViewOriginalText(),
+ hiveTable.getViewExpandedText(),
+ tableSchema,
+ properties,
+ comment
+ );
} else {
- // IMetastoreClient.alter_table() requires the table to
have a valid location, which it doesn't in this case
- // Thus we have to translate alterTable() into
(dropTable() + createTable())
- dropTable(tablePath, false);
- try {
- createTable(tablePath, newTable, false);
- } catch (TableAlreadyExistException |
DatabaseNotExistException e) {
- // These exceptions wouldn't be thrown, unless
a concurrent operation is triggered in Hive
- throw new CatalogException(
- String.format("Failed to alter table
%s", tablePath), e);
- }
+ return new GenericCatalogTable(
+ tableSchema, partitionKeys, properties,
comment);
}
}
@Override
- public List<String> listTables(String databaseName) throws
DatabaseNotExistException, CatalogException {
- try {
- return client.getAllTables(databaseName);
- } catch (UnknownDBException e) {
- throw new DatabaseNotExistException(catalogName,
databaseName);
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to list tables in
database %s", databaseName), e);
- }
- }
+ protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable
table) {
+ Map<String, String> properties = new
HashMap<>(table.getProperties());
- @Override
- public List<String> listViews(String databaseName) throws
DatabaseNotExistException, CatalogException {
- try {
- return client.getTables(
- databaseName,
- null, // table pattern
- TableType.VIRTUAL_VIEW);
- } catch (UnknownDBException e) {
- throw new DatabaseNotExistException(catalogName,
databaseName);
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to list views in database
%s", databaseName), e);
- }
- }
+ // Table comment
+ properties.put(HiveTableConfig.TABLE_COMMENT,
table.getComment());
- @Override
- public CatalogBaseTable getTable(ObjectPath tablePath) throws
TableNotExistException, CatalogException {
- Table hiveTable = getHiveTable(tablePath);
+ Table hiveTable = new Table();
+ hiveTable.setDbName(tablePath.getDatabaseName());
+ hiveTable.setTableName(tablePath.getObjectName());
+ hiveTable.setCreateTime((int) (System.currentTimeMillis() /
1000));
- return
GenericHiveMetastoreCatalogUtil.createCatalogTable(hiveTable);
- }
+ // Table properties
+ hiveTable.setParameters(maskFlinkProperties(properties));
- protected Table getHiveTable(ObjectPath tablePath) throws
TableNotExistException {
- try {
- return client.getTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
- } catch (NoSuchObjectException e) {
- throw new TableNotExistException(catalogName,
tablePath);
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to get table %s from Hive
metastore", tablePath.getFullName()), e);
- }
- }
+ // Hive table's StorageDescriptor
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
- @Override
- public boolean tableExists(ObjectPath tablePath) throws
CatalogException {
- try {
- return client.tableExists(tablePath.getDatabaseName(),
tablePath.getObjectName());
- } catch (UnknownDBException e) {
- return false;
- } catch (TException e) {
- throw new CatalogException(
- String.format("Failed to check whether table %s
exists or not.", tablePath.getFullName()), e);
+ List<FieldSchema> allColumns =
HiveTableUtil.createHiveColumns(table.getSchema());
+
+ // Table columns and partition keys
+ if (table instanceof CatalogTable) {
+ CatalogTable catalogTable = (CatalogTable) table;
+
+ if (catalogTable.isPartitioned()) {
+ int partitionKeySize =
catalogTable.getPartitionKeys().size();
+ List<FieldSchema> regularColumns =
allColumns.subList(0, allColumns.size() - partitionKeySize);
+ List<FieldSchema> partitionColumns =
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+ sd.setCols(regularColumns);
+ hiveTable.setPartitionKeys(partitionColumns);
+ } else {
+ sd.setCols(allColumns);
+ hiveTable.setPartitionKeys(new ArrayList<>());
+ }
+ } else if (table instanceof CatalogView) {
+ CatalogView view = (CatalogView) table;
+
+ // TODO: [FLINK-12398] Support partitioned view in
catalog API
+ sd.setCols(allColumns);
+ hiveTable.setPartitionKeys(new ArrayList<>());
+
+ hiveTable.setViewOriginalText(view.getOriginalQuery());
+ hiveTable.setViewExpandedText(view.getExpandedQuery());
+ hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
+ } else {
+ throw new IllegalArgumentException(
+ "GenericHiveMetastoreCatalog only supports
CatalogTable and CatalogView");
}
+
+ hiveTable.setSd(sd);
+
+ return hiveTable;
}
// ------ partitions ------
@@ -363,4 +334,23 @@ public class GenericHiveMetastoreCatalog extends
HiveCatalogBase {
throw new UnsupportedOperationException();
}
+ // ------ utils ------
+
+ /**
+ * Filter out Hive-created properties, and return Flink-created
properties.
+ */
+ private static Map<String, String> retrieveFlinkProperties(Map<String,
String> hiveTableParams) {
+ return hiveTableParams.entrySet().stream()
+ .filter(e ->
e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+ .collect(Collectors.toMap(e ->
e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
+ }
+
+ /**
+ * Add a prefix to Flink-created properties to distinguish them from
Hive-created properties.
+ */
+ private static Map<String, String> maskFlinkProperties(Map<String,
String> properties) {
+ return properties.entrySet().stream()
+ .filter(e -> e.getKey() != null && e.getValue() != null)
+ .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX +
e.getKey(), e -> e.getValue()));
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index e1c2c30..5ec6fd8 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -18,11 +18,13 @@
package org.apache.flink.table.catalog.hive;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
@@ -32,18 +34,26 @@ import
org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
/**
* A catalog implementation for Hive.
@@ -88,48 +98,84 @@ public class HiveCatalog extends HiveCatalogBase {
// ------ tables and views------
@Override
- public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
- throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ protected void validateCatalogBaseTable(CatalogBaseTable table)
+ throws CatalogException {
+ // TODO: validate HiveCatalogView
+ if (!(table instanceof HiveCatalogTable)) {
+ throw new CatalogException(
+ "HiveCatalog can only operate on
HiveCatalogTable and HiveCatalogView.");
+ }
}
@Override
- public void renameTable(ObjectPath tablePath, String newTableName,
boolean ignoreIfNotExists)
- throws TableNotExistException,
TableAlreadyExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
+ protected CatalogBaseTable createCatalogBaseTable(Table hiveTable) {
+ // Table schema
+ TableSchema tableSchema =
+
HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(),
hiveTable.getPartitionKeys());
- @Override
- public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
- throws TableAlreadyExistException,
DatabaseNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
+ // Table properties
+ Map<String, String> properties = hiveTable.getParameters();
- @Override
- public void alterTable(ObjectPath tableName, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
- throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
+ // Table comment
+ String comment =
properties.remove(HiveTableConfig.TABLE_COMMENT);
- @Override
- public List<String> listTables(String databaseName)
- throws DatabaseNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
+ // Partition keys
+ List<String> partitionKeys = new ArrayList<>();
- @Override
- public List<String> listViews(String databaseName) throws
DatabaseNotExistException, CatalogException {
- throw new UnsupportedOperationException();
- }
+ if (!hiveTable.getPartitionKeys().isEmpty()) {
+ partitionKeys = hiveTable.getPartitionKeys().stream()
+ .map(fs -> fs.getName())
+ .collect(Collectors.toList());
+ }
- @Override
- public CatalogBaseTable getTable(ObjectPath objectPath) throws
TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ return new HiveCatalogTable(tableSchema, partitionKeys,
properties, comment);
}
@Override
- public boolean tableExists(ObjectPath objectPath) throws
CatalogException {
- throw new UnsupportedOperationException();
+ protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable
table) {
+ Map<String, String> properties = new
HashMap<>(table.getProperties());
+
+ // Table comment
+ properties.put(HiveTableConfig.TABLE_COMMENT,
table.getComment());
+
+ Table hiveTable = new Table();
+ hiveTable.setDbName(tablePath.getDatabaseName());
+ hiveTable.setTableName(tablePath.getObjectName());
+ hiveTable.setCreateTime((int) (System.currentTimeMillis() /
1000));
+
+ // Table properties
+ hiveTable.setParameters(properties);
+
+ // Hive table's StorageDescriptor
+ // TODO: This is very basic Hive table.
+ // [FLINK-11479] Add input/output format and SerDeLib
information for Hive tables in HiveCatalogUtil#createHiveTable
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+ List<FieldSchema> allColumns =
HiveTableUtil.createHiveColumns(table.getSchema());
+
+ // Table columns and partition keys
+ if (table instanceof CatalogTable) {
+ HiveCatalogTable catalogTable = (HiveCatalogTable)
table;
+
+ if (catalogTable.isPartitioned()) {
+ int partitionKeySize =
catalogTable.getPartitionKeys().size();
+ List<FieldSchema> regularColumns =
allColumns.subList(0, allColumns.size() - partitionKeySize);
+ List<FieldSchema> partitionColumns =
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+ sd.setCols(regularColumns);
+ hiveTable.setPartitionKeys(partitionColumns);
+ } else {
+ sd.setCols(allColumns);
+ hiveTable.setPartitionKeys(new ArrayList<>());
+ }
+ } else {
+ throw new UnsupportedOperationException("HiveCatalog
doesn't support view yet");
+ }
+
+ hiveTable.setSd(sd);
+
+ return hiveTable;
}
// ------ partitions ------
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
index 7c2fdcc..5c44ebc 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
@@ -19,21 +19,28 @@
package org.apache.flink.table.catalog.hive;
import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,6 +96,34 @@ public abstract class HiveCatalogBase implements Catalog {
}
}
+ // ------ APIs ------
+
+ /**
+ * Validate input base table.
+ *
+ * @param catalogBaseTable the base table to be validated
+ * @throws CatalogException thrown if the input base table is invalid.
+ */
+ protected abstract void validateCatalogBaseTable(CatalogBaseTable
catalogBaseTable)
+ throws CatalogException;
+
+ /**
+ * Create a CatalogBaseTable from a Hive table.
+ *
+ * @param hiveTable a Hive table
+ * @return a CatalogBaseTable
+ */
+ protected abstract CatalogBaseTable createCatalogBaseTable(Table
hiveTable);
+
+ /**
+ * Create a Hive table from a CatalogBaseTable.
+ *
+ * @param tablePath path of the table
+ * @param table a CatalogBaseTable
+ * @return a Hive table
+ */
+ protected abstract Table createHiveTable(ObjectPath tablePath,
CatalogBaseTable table);
+
@Override
public void open() throws CatalogException {
if (client == null) {
@@ -197,4 +232,159 @@ public abstract class HiveCatalogBase implements Catalog {
throw new CatalogException(String.format("Failed to
alter database %s", name), e);
}
}
+
+ // ------ tables ------
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ return createCatalogBaseTable(getHiveTable(tablePath));
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
+ throws TableAlreadyExistException,
DatabaseNotExistException, CatalogException {
+ validateCatalogBaseTable(table);
+
+ if (!databaseExists(tablePath.getDatabaseName())) {
+ throw new DatabaseNotExistException(catalogName,
tablePath.getDatabaseName());
+ } else {
+ try {
+ client.createTable(createHiveTable(tablePath,
table));
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new
TableAlreadyExistException(catalogName, tablePath);
+ }
+ } catch (TException e) {
+ throw new
CatalogException(String.format("Failed to create table %s",
tablePath.getFullName()), e);
+ }
+ }
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName,
boolean ignoreIfNotExists)
+ throws TableNotExistException,
TableAlreadyExistException, CatalogException {
+ try {
+ // alter_table() doesn't throw a clear exception when
target table doesn't exist.
+ // Thus, check the table existence explicitly
+ if (tableExists(tablePath)) {
+ ObjectPath newPath = new
ObjectPath(tablePath.getDatabaseName(), newTableName);
+ // alter_table() doesn't throw a clear
exception when new table already exists.
+ // Thus, check the table existence explicitly
+ if (tableExists(newPath)) {
+ throw new
TableAlreadyExistException(catalogName, newPath);
+ } else {
+ Table table = getHiveTable(tablePath);
+ table.setTableName(newTableName);
+
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(),
table);
+ }
+ } else if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName,
tablePath);
+ }
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to rename table %s",
tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public void alterTable(ObjectPath tablePath, CatalogBaseTable
newCatalogTable, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ validateCatalogBaseTable(newCatalogTable);
+
+ try {
+ if (!tableExists(tablePath)) {
+ if (!ignoreIfNotExists) {
+ throw new
TableNotExistException(catalogName, tablePath);
+ }
+ } else {
+ // TODO: [FLINK-12452] alterTable() in all
catalogs should ensure existing base table and the new one are of the same type
+ Table newTable = createHiveTable(tablePath,
newCatalogTable);
+
+ // client.alter_table() requires a valid
location
+ // thus, if new table doesn't have that, it
reuses location of the old table
+ if (!newTable.getSd().isSetLocation()) {
+ Table oldTable =
getHiveTable(tablePath);
+
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+ }
+
+ client.alter_table(tablePath.getDatabaseName(),
tablePath.getObjectName(), newTable);
+ }
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to rename table %s",
tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ try {
+ client.dropTable(
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName(),
+ // Indicate whether associated data should be
deleted.
+ // Set to 'true' for now because Flink tables
shouldn't have data in Hive. Can be changed later if necessary
+ true,
+ ignoreIfNotExists);
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName,
tablePath);
+ }
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to drop table %s",
tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ try {
+ return client.getAllTables(databaseName);
+ } catch (UnknownDBException e) {
+ throw new DatabaseNotExistException(catalogName,
databaseName);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to list tables in
database %s", databaseName), e);
+ }
+ }
+
+ @Override
+ public List<String> listViews(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ try {
+ return client.getTables(
+ databaseName,
+ null, // table pattern
+ TableType.VIRTUAL_VIEW);
+ } catch (UnknownDBException e) {
+ throw new DatabaseNotExistException(catalogName,
databaseName);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to list views in database
%s", databaseName), e);
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws
CatalogException {
+ try {
+ return client.tableExists(tablePath.getDatabaseName(),
tablePath.getObjectName());
+ } catch (UnknownDBException e) {
+ return false;
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to check whether table %s
exists or not.", tablePath.getFullName()), e);
+ }
+ }
+
+ private Table getHiveTable(ObjectPath tablePath) throws
TableNotExistException {
+ try {
+ return client.getTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
+ } catch (NoSuchObjectException e) {
+ throw new TableNotExistException(catalogName,
tablePath);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to get table %s from Hive
metastore", tablePath.getFullName()), e);
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
new file mode 100644
index 0000000..87fb413
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A Hive catalog table implementation.
+ */
+public class HiveCatalogTable implements CatalogTable {
+ // Schema of the table (column names and types)
+ private final TableSchema tableSchema;
+ // Partition keys if this is a partitioned table. It's an empty set if
the table is not partitioned
+ private final List<String> partitionKeys;
+ // Properties of the table
+ private final Map<String, String> properties;
+ // Comment of the table
+ private String comment = "This is a hive catalog table.";
+
+ public HiveCatalogTable(
+ TableSchema tableSchema,
+ List<String> partitionKeys,
+ Map<String, String> properties,
+ String comment) {
+ this.tableSchema = checkNotNull(tableSchema, "tableSchema
cannot be null");
+ this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys
cannot be null");
+ this.properties = checkNotNull(properties, "properties cannot
be null");
+ this.comment = comment;
+ }
+
+ public HiveCatalogTable(
+ TableSchema tableSchema,
+ Map<String, String> properties,
+ String comment) {
+ this(tableSchema, new ArrayList<>(), properties, comment);
+ }
+
+ @Override
+ public boolean isPartitioned() {
+ return !partitionKeys.isEmpty();
+ }
+
+ @Override
+ public List<String> getPartitionKeys() {
+ return partitionKeys;
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public TableSchema getSchema() {
+ return tableSchema;
+ }
+
+ @Override
+ public String getComment() {
+ return comment;
+ }
+
+ @Override
+ public CatalogBaseTable copy() {
+ return new HiveCatalogTable(
+ tableSchema.copy(), new ArrayList<>(partitionKeys), new
HashMap<>(properties), comment);
+ }
+
+ @Override
+ public Optional<String> getDescription() {
+ return Optional.ofNullable(comment);
+ }
+
+ @Override
+ public Optional<String> getDetailedDescription() {
+ // TODO: return a detailed description
+ return Optional.ofNullable(comment);
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
index e972ba4..1a64a68 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
@@ -43,6 +43,5 @@ public class HiveCatalogUtil {
db.getComment(),
hiveCatalogDatabase.getLocation(),
hiveCatalogDatabase.getProperties());
-
}
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
index 336d16f..e5063f2 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
@@ -19,11 +19,11 @@
package org.apache.flink.table.catalog.hive;
/**
- * Configs for Flink tables stored in Hive metastore.
+ * Configs for tables in Hive metastore.
*/
public class HiveTableConfig {
- // Description of the Flink table
+ // Comment of the Flink table
public static final String TABLE_COMMENT = "comment";
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
index a88f083..7564e40 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
@@ -18,46 +18,15 @@
package org.apache.flink.table.catalog.hive.util;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogView;
-import org.apache.flink.table.catalog.GenericCatalogTable;
-import org.apache.flink.table.catalog.GenericCatalogView;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.hive.HiveCatalogBaseUtil;
-import org.apache.flink.table.catalog.hive.HiveTableConfig;
-import org.apache.flink.table.catalog.hive.HiveTypeUtil;
-import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
/**
* Utils to convert meta objects between Flink and Hive for
GenericHiveMetastoreCatalog.
*/
public class GenericHiveMetastoreCatalogUtil {
- // Prefix used to distinguish properties created by Hive and Flink,
- // as Hive metastore has its own properties created upon table creation
and migration between different versions of metastore.
- private static final String FLINK_PROPERTY_PREFIX = "flink.";
-
- // Flink tables should be stored as 'external' tables in Hive metastore
- private static final Map<String, String> EXTERNAL_TABLE_PROPERTY = new
HashMap<String, String>() {{
- put("EXTERNAL", "TRUE");
- }};
-
private GenericHiveMetastoreCatalogUtil() {
}
@@ -77,144 +46,4 @@ public class GenericHiveMetastoreCatalogUtil {
null,
catalogDatabase.getProperties());
}
-
- /**
- * Creates a Hive table from a CatalogBaseTable.
- *
- * @param tablePath path of the table
- * @param table the CatalogBaseTable instance
- * @return a Hive table
- */
- public static Table createHiveTable(ObjectPath tablePath,
CatalogBaseTable table) {
- Map<String, String> properties = new
HashMap<>(table.getProperties());
-
- // Table comment
- properties.put(HiveTableConfig.TABLE_COMMENT,
table.getComment());
-
- Table hiveTable = new Table();
- hiveTable.setDbName(tablePath.getDatabaseName());
- hiveTable.setTableName(tablePath.getObjectName());
- hiveTable.setCreateTime((int) (System.currentTimeMillis() /
1000));
-
- // Table properties
- hiveTable.setParameters(buildFlinkProperties(properties));
- hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
-
- // Hive table's StorageDescriptor
- StorageDescriptor sd = new StorageDescriptor();
- sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
-
- List<FieldSchema> allColumns =
createHiveColumns(table.getSchema());
-
- // Table columns and partition keys
- if (table instanceof CatalogTable) {
- CatalogTable catalogTable = (CatalogTable) table;
-
- if (catalogTable.isPartitioned()) {
- int partitionKeySize =
catalogTable.getPartitionKeys().size();
- List<FieldSchema> regularColumns =
allColumns.subList(0, allColumns.size() - partitionKeySize);
- List<FieldSchema> partitionColumns =
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
-
- sd.setCols(regularColumns);
- hiveTable.setPartitionKeys(partitionColumns);
- } else {
- sd.setCols(allColumns);
- hiveTable.setPartitionKeys(new ArrayList<>());
- }
-
- hiveTable.setTableType(TableType.EXTERNAL_TABLE.name());
- } else if (table instanceof CatalogView) {
- CatalogView view = (CatalogView) table;
-
- // TODO: [FLINK-12398] Support partitioned view in
catalog API
- sd.setCols(allColumns);
- hiveTable.setPartitionKeys(new ArrayList<>());
-
- hiveTable.setViewOriginalText(view.getOriginalQuery());
- hiveTable.setViewExpandedText(view.getExpandedQuery());
- hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
- } else {
- throw new IllegalArgumentException(
- "GenericHiveMetastoreCatalog only supports
CatalogTable and CatalogView");
- }
-
- hiveTable.setSd(sd);
-
- return hiveTable;
- }
-
- /**
- * Creates a CatalogBaseTable from a Hive table.
- *
- * @param hiveTable the Hive table
- * @return a CatalogBaseTable
- */
- public static CatalogBaseTable createCatalogTable(Table hiveTable) {
- // Table schema
- TableSchema tableSchema = HiveCatalogBaseUtil.createTableSchema(
- hiveTable.getSd().getCols(),
hiveTable.getPartitionKeys());
-
- // Table properties
- Map<String, String> properties =
retrieveFlinkProperties(hiveTable.getParameters());
-
- // Table comment
- String comment =
properties.remove(HiveTableConfig.TABLE_COMMENT);
-
- // Partition keys
- List<String> partitionKeys = new ArrayList<>();
-
- if (!hiveTable.getPartitionKeys().isEmpty()) {
- partitionKeys = hiveTable.getPartitionKeys().stream()
- .map(fs ->
fs.getName())
-
.collect(Collectors.toList());
- }
-
- if (TableType.valueOf(hiveTable.getTableType()) ==
TableType.VIRTUAL_VIEW) {
- return new GenericCatalogView(
- hiveTable.getViewOriginalText(),
- hiveTable.getViewExpandedText(),
- tableSchema,
- properties,
- comment
- );
- } else {
- return new GenericCatalogTable(
- tableSchema, partitionKeys, properties,
comment);
- }
- }
-
- /**
- * Create Hive columns from Flink TableSchema.
- */
- private static List<FieldSchema> createHiveColumns(TableSchema schema) {
- String[] fieldNames = schema.getFieldNames();
- TypeInformation[] fieldTypes = schema.getFieldTypes();
-
- List<FieldSchema> columns = new ArrayList<>(fieldNames.length);
-
- for (int i = 0; i < fieldNames.length; i++) {
- columns.add(
- new FieldSchema(fieldNames[i],
HiveTypeUtil.toHiveType(fieldTypes[i]), null));
- }
-
- return columns;
- }
-
- /**
- * Filter out Hive-created properties, and return Flink-created
properties.
- */
- private static Map<String, String> retrieveFlinkProperties(Map<String,
String> hiveTableParams) {
- return hiveTableParams.entrySet().stream()
- .filter(e ->
e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
- .collect(Collectors.toMap(e ->
e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
- }
-
- /**
- * Add a prefix to Flink-created properties to distinguish them from
Hive-created properties.
- */
- public static Map<String, String> buildFlinkProperties(Map<String,
String> properties) {
- return properties.entrySet().stream()
- .filter(e -> e.getKey() != null && e.getValue() != null)
- .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX +
e.getKey(), e -> e.getValue()));
- }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
similarity index 72%
rename from
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java
rename to
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index 3f93b4a..c7c68f1 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -16,10 +16,11 @@
* limitations under the License.
*/
-package org.apache.flink.table.catalog.hive;
+package org.apache.flink.table.catalog.hive.util;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.hive.HiveTypeUtil;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -28,16 +29,15 @@ import java.util.ArrayList;
import java.util.List;
/**
- * Shared util for catalogs backed by Hive-metastore.
+ * Utils to for Hive-backed table.
*/
-public class HiveCatalogBaseUtil {
+public class HiveTableUtil {
+
+ private HiveTableUtil() {
+ }
/**
* Create a Flink's TableSchema from Hive table's columns and partition
keys.
- *
- * @param cols columns of the Hive table
- * @param partitionKeys partition keys of the Hive table
- * @return a Flink TableSchema
*/
public static TableSchema createTableSchema(List<FieldSchema> cols,
List<FieldSchema> partitionKeys) {
List<FieldSchema> allCols = new ArrayList<>(cols);
@@ -55,4 +55,21 @@ public class HiveCatalogBaseUtil {
return new TableSchema(colNames, colTypes);
}
+
+ /**
+ * Create Hive columns from Flink TableSchema.
+ */
+ public static List<FieldSchema> createHiveColumns(TableSchema schema) {
+ String[] fieldNames = schema.getFieldNames();
+ TypeInformation[] fieldTypes = schema.getFieldTypes();
+
+ List<FieldSchema> columns = new ArrayList<>(fieldNames.length);
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ columns.add(
+ new FieldSchema(fieldNames[i],
HiveTypeUtil.toHiveType(fieldTypes[i]), null));
+ }
+
+ return columns;
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index 390ab1d..ca7df2e 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTestBase;
-import org.apache.flink.table.catalog.CatalogTestUtil;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.GenericCatalogDatabase;
import org.apache.flink.table.catalog.GenericCatalogTable;
@@ -88,7 +87,7 @@ public class GenericHiveMetastoreCatalogTest extends
CatalogTestBase {
catalog.createDatabase(db1, createDb(), false);
catalog.createTable(path1, table, false);
- CatalogTestUtil.checkEquals(table, (CatalogTable)
catalog.getTable(path1));
+ checkEquals(table, (CatalogTable) catalog.getTable(path1));
}
// ------ utils ------
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
index dfd7fcf..2b7e397 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
@@ -28,82 +28,37 @@ import org.junit.BeforeClass;
import java.io.IOException;
import java.util.HashMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
/**
* Test for HiveCatalog.
*/
public class HiveCatalogTest extends CatalogTestBase {
+
@BeforeClass
public static void init() throws IOException {
- catalog = HiveTestUtils.createGenericHiveMetastoreCatalog();
+ catalog = HiveTestUtils.createHiveCatalog();
catalog.open();
}
// =====================
- // HiveCatalog doesn't support table operation yet
- // Thus, overriding the following tests which involve table operation
in CatalogTestBase so they won't run against HiveCatalog
+ // HiveCatalog doesn't support streaming table operation. Ignore this
test in CatalogTestBase.
// =====================
- // TODO: re-enable these tests once HiveCatalog support table operations
- public void testDropDb_DatabaseNotEmptyException() throws Exception {
- }
-
public void testCreateTable_Streaming() throws Exception {
}
- public void testCreateTable_Batch() throws Exception {
- }
-
- public void testCreateTable_DatabaseNotExistException() throws
Exception {
- }
-
- public void testCreateTable_TableAlreadyExistException() throws
Exception {
- }
-
- public void testCreateTable_TableAlreadyExist_ignored() throws
Exception {
- }
-
- public void testGetTable_TableNotExistException() throws Exception {
- }
-
- public void testGetTable_TableNotExistException_NoDb() throws Exception
{
- }
-
- public void testDropTable_nonPartitionedTable() throws Exception {
- }
-
- public void testDropTable_TableNotExistException() throws Exception {
- }
-
- public void testDropTable_TableNotExist_ignored() throws Exception {
- }
-
- public void testAlterTable() throws Exception {
- }
-
- public void testAlterTable_TableNotExistException() throws Exception {
- }
-
- public void testAlterTable_TableNotExist_ignored() throws Exception {
- }
-
- public void testRenameTable_nonPartitionedTable() throws Exception {
- }
-
- public void testRenameTable_TableNotExistException() throws Exception {
- }
-
- public void testRenameTable_TableNotExistException_ignored() throws
Exception {
- }
+ // =====================
+ // HiveCatalog doesn't support view operation yet
+ // Thus, overriding the following tests which involve table operation
in CatalogTestBase so they won't run against HiveCatalog
+ // =====================
- public void testRenameTable_TableAlreadyExistException() throws
Exception {
- }
+ // TODO: re-enable these tests once HiveCatalog support view operations
public void testListTables() throws Exception {
}
- public void testTableExists() throws Exception {
- }
-
public void testCreateView() throws Exception {
}
@@ -163,32 +118,43 @@ public class HiveCatalogTest extends CatalogTestBase {
@Override
public CatalogTable createTable() {
- // TODO: implement this once HiveCatalog support table
operations
- return null;
+ return new HiveCatalogTable(
+ createTableSchema(),
+ getBatchTableProperties(),
+ TEST_COMMENT
+ );
}
@Override
public CatalogTable createAnotherTable() {
- // TODO: implement this once HiveCatalog support table
operations
- return null;
+ return new HiveCatalogTable(
+ createAnotherTableSchema(),
+ getBatchTableProperties(),
+ TEST_COMMENT
+ );
}
@Override
public CatalogTable createStreamingTable() {
- // TODO: implement this once HiveCatalog support table
operations
- return null;
+ throw new UnsupportedOperationException("HiveCatalog doesn't
support streaming tables.");
}
@Override
public CatalogTable createPartitionedTable() {
- // TODO: implement this once HiveCatalog support table
operations
- return null;
+ return new HiveCatalogTable(
+ createTableSchema(),
+ createPartitionKeys(),
+ getBatchTableProperties(),
+ TEST_COMMENT);
}
@Override
public CatalogTable createAnotherPartitionedTable() {
- // TODO: implement this once HiveCatalog support table
operations
- return null;
+ return new HiveCatalogTable(
+ createAnotherTableSchema(),
+ createPartitionKeys(),
+ getBatchTableProperties(),
+ TEST_COMMENT);
}
@Override
@@ -202,4 +168,16 @@ public class HiveCatalogTest extends CatalogTestBase {
// TODO: implement this once HiveCatalog support view operations
return null;
}
+
+ @Override
+ public void checkEquals(CatalogTable t1, CatalogTable t2) {
+ assertEquals(t1.getSchema(), t2.getSchema());
+ assertEquals(t1.getComment(), t2.getComment());
+ assertEquals(t1.getPartitionKeys(), t2.getPartitionKeys());
+ assertEquals(t1.isPartitioned(), t2.isPartitioned());
+
+ // Hive tables may have properties created by itself
+ // thus properties of Hive table is a super set of those in its
corresponding Flink table
+
assertTrue(t2.getProperties().entrySet().containsAll(t1.getProperties().entrySet()));
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 4a32313..2618c8c 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -40,6 +40,13 @@ public class HiveTestUtils {
return new
GenericHiveMetastoreCatalog(CatalogTestBase.TEST_CATALOG_NAME, getHiveConf());
}
+ /**
+ * Create a HiveCatalog with an embedded Hive Metastore.
+ */
+ public static HiveCatalog createHiveCatalog() throws IOException {
+ return new HiveCatalog(CatalogTestBase.TEST_CATALOG_NAME,
getHiveConf());
+ }
+
private static HiveConf getHiveConf() throws IOException {
ClassLoader classLoader = new
HiveTestUtils().getClass().getClassLoader();
HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML));
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 6fabb78..89cc0ee 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -97,12 +97,12 @@ public class GenericInMemoryCatalogTest extends
CatalogTestBase {
CatalogPartitionSpec catalogPartitionSpec =
createPartitionSpec();
catalog.createPartition(path1, catalogPartitionSpec,
catalogPartition, false);
- CatalogTestUtil.checkEquals(table, (CatalogTable)
catalog.getTable(path1));
+ checkEquals(table, (CatalogTable) catalog.getTable(path1));
assertTrue(catalog.partitionExists(path1,
catalogPartitionSpec));
catalog.renameTable(path1, t2, false);
- CatalogTestUtil.checkEquals(table, (CatalogTable)
catalog.getTable(path3));
+ checkEquals(table, (CatalogTable) catalog.getTable(path3));
assertTrue(catalog.partitionExists(path3,
catalogPartitionSpec));
assertFalse(catalog.tableExists(path1));
assertFalse(catalog.partitionExists(path1,
catalogPartitionSpec));
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 93c9ff1..ec37104 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -244,7 +244,7 @@ public abstract class CatalogTestBase {
CatalogTable table = createStreamingTable();
catalog.createTable(path1, table, false);
- CatalogTestUtil.checkEquals(table, (CatalogTable)
catalog.getTable(path1));
+ checkEquals(table, (CatalogTable) catalog.getTable(path1));
}
@Test
@@ -257,7 +257,7 @@ public abstract class CatalogTestBase {
CatalogBaseTable tableCreated = catalog.getTable(path1);
- CatalogTestUtil.checkEquals(table, (CatalogTable) tableCreated);
+ checkEquals(table, (CatalogTable) tableCreated);
assertEquals(TEST_COMMENT, tableCreated.getDescription().get());
List<String> tables = catalog.listTables(db1);
@@ -271,7 +271,7 @@ public abstract class CatalogTestBase {
table = createPartitionedTable();
catalog.createTable(path1, table, false);
- CatalogTestUtil.checkEquals(table, (CatalogTable)
catalog.getTable(path1));
+ checkEquals(table, (CatalogTable) catalog.getTable(path1));
tables = catalog.listTables(db1);
@@ -305,11 +305,11 @@ public abstract class CatalogTestBase {
CatalogTable table = createTable();
catalog.createTable(path1, table, false);
- CatalogTestUtil.checkEquals(table, (CatalogTable)
catalog.getTable(path1));
+ checkEquals(table, (CatalogTable) catalog.getTable(path1));
catalog.createTable(path1, createAnotherTable(), true);
- CatalogTestUtil.checkEquals(table, (CatalogTable)
catalog.getTable(path1));
+ checkEquals(table, (CatalogTable) catalog.getTable(path1));
}
@Test
@@ -361,13 +361,13 @@ public abstract class CatalogTestBase {
CatalogTable table = createTable();
catalog.createTable(path1, table, false);
- CatalogTestUtil.checkEquals(table, (CatalogTable)
catalog.getTable(path1));
+ checkEquals(table, (CatalogTable) catalog.getTable(path1));
CatalogTable newTable = createAnotherTable();
catalog.alterTable(path1, newTable, false);
assertNotEquals(table, catalog.getTable(path1));
- CatalogTestUtil.checkEquals(newTable, (CatalogTable)
catalog.getTable(path1));
+ checkEquals(newTable, (CatalogTable) catalog.getTable(path1));
catalog.dropTable(path1, false);
@@ -375,12 +375,12 @@ public abstract class CatalogTestBase {
table = createPartitionedTable();
catalog.createTable(path1, table, false);
- CatalogTestUtil.checkEquals(table, (CatalogTable)
catalog.getTable(path1));
+ checkEquals(table, (CatalogTable) catalog.getTable(path1));
newTable = createAnotherPartitionedTable();
catalog.alterTable(path1, newTable, false);
- CatalogTestUtil.checkEquals(newTable, (CatalogTable)
catalog.getTable(path1));
+ checkEquals(newTable, (CatalogTable) catalog.getTable(path1));
}
@Test
@@ -404,11 +404,11 @@ public abstract class CatalogTestBase {
CatalogTable table = createTable();
catalog.createTable(path1, table, false);
- CatalogTestUtil.checkEquals(table, (CatalogTable)
catalog.getTable(path1));
+ checkEquals(table, (CatalogTable) catalog.getTable(path1));
catalog.renameTable(path1, t2, false);
- CatalogTestUtil.checkEquals(table, (CatalogTable)
catalog.getTable(path3));
+ checkEquals(table, (CatalogTable) catalog.getTable(path3));
assertFalse(catalog.tableExists(path1));
}
@@ -692,4 +692,15 @@ public abstract class CatalogTestBase {
put(IS_STREAMING, "true");
}};
}
+
+ // ------ equality check utils ------
+ // Can be overriden by sub test class
+
+ protected void checkEquals(CatalogTable t1, CatalogTable t2) {
+ assertEquals(t1.getSchema(), t2.getSchema());
+ assertEquals(t1.getProperties(), t2.getProperties());
+ assertEquals(t1.getComment(), t2.getComment());
+ assertEquals(t1.getPartitionKeys(), t2.getPartitionKeys());
+ assertEquals(t1.isPartitioned(), t2.isPartitioned());
+ }
}
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index b131698..412c73c 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -28,6 +28,7 @@ import
org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.catalog.stats.Date;
+import org.apache.flink.table.plan.stats.TableStats;
import java.util.Map;
@@ -36,16 +37,12 @@ import static org.junit.Assert.assertTrue;
/**
* Utility class for catalog testing.
+ * TODO: Move util methods to CatalogTestBase and remove this class
*/
public class CatalogTestUtil {
-
- public static void checkEquals(CatalogTable t1, CatalogTable t2) {
- assertEquals(t1.getSchema(), t2.getSchema());
- assertEquals(t1.getComment(), t2.getComment());
- assertEquals(t1.getProperties(), t2.getProperties());
- assertEquals(t1.getPartitionKeys(), t2.getPartitionKeys());
- assertEquals(t1.isPartitioned(), t2.isPartitioned());
- assertEquals(t1.getDescription(), t2.getDescription());
+ public static void checkEquals(TableStats ts1, TableStats ts2) {
+ assertEquals(ts1.getRowCount(), ts2.getRowCount());
+ assertEquals(ts1.getColumnStats().size(),
ts2.getColumnStats().size());
}
public static void checkEquals(CatalogView v1, CatalogView v2) {