bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]:
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285404214
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
##########
@@ -54,117 +75,494 @@
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
*/
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG =
LoggerFactory.getLogger(HiveCatalog.class);
+ private static final String DEFAULT_DB = "default";
- public HiveCatalog(String catalogName, String hivemetastoreURI) {
- super(catalogName, hivemetastoreURI);
+ // Prefix used to distinguish properties created by Hive and Flink,
+ // as Hive metastore has its own properties created upon table creation
and migration between different versions of metastore.
+ private static final String FLINK_PROPERTY_PREFIX = "flink.";
+ private static final String GENERC_META_PROPERTY_KEY =
"flink.is_generic";
- LOG.info("Created HiveCatalog '{}'", catalogName);
+ protected final String catalogName;
+ protected final HiveConf hiveConf;
+
+ private final String defaultDatabase;
+ protected IMetaStoreClient client;
+
+ public HiveCatalog(String catalogName, String hivemetastoreURI) {
+ this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
public HiveCatalog(String catalogName, HiveConf hiveConf) {
- super(catalogName, hiveConf);
+ this(catalogName, DEFAULT_DB, hiveConf);
+ }
+
+ public HiveCatalog(String catalogName, String defaultDatabase, HiveConf
hiveConf) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName),
"catalogName cannot be null or empty");
+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase),
"defaultDatabase cannot be null or empty");
+ this.catalogName = catalogName;
+ this.defaultDatabase = defaultDatabase;
+ this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be
null");
LOG.info("Created HiveCatalog '{}'", catalogName);
}
+ private static HiveConf getHiveConf(String hiveMetastoreURI) {
+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI),
"hiveMetastoreURI cannot be null or empty");
+
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,
hiveMetastoreURI);
+ return hiveConf;
+ }
+
+ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+ try {
+ return RetryingMetaStoreClient.getProxy(
+ hiveConf,
+ null,
+ null,
+ HiveMetaStoreClient.class.getName(),
+ true);
+ } catch (MetaException e) {
+ throw new CatalogException("Failed to create Hive
metastore client", e);
+ }
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ if (client == null) {
+ client = getMetastoreClient(hiveConf);
+ LOG.info("Connected to Hive metastore");
+ }
+
+ if (!databaseExists(defaultDatabase)) {
+ throw new CatalogException(String.format("Configured
default database %s doesn't exist in catalog %s.",
+ defaultDatabase, catalogName));
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (client != null) {
+ client.close();
+ client = null;
+ LOG.info("Close connection to Hive metastore");
+ }
+ }
+
// ------ databases ------
+ public String getDefaultDatabase() throws CatalogException {
+ return defaultDatabase;
+ }
+
@Override
- protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
- return new HiveCatalogDatabase(
- hiveDatabase.getParameters(),
- hiveDatabase.getLocationUri(),
- hiveDatabase.getDescription());
+ public CatalogDatabase getDatabase(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ Database hiveDatabase = getHiveDatabase(databaseName);
+
+ Map<String, String> properties = hiveDatabase.getParameters();
+ boolean isGeneric =
Boolean.valueOf(properties.get(GENERC_META_PROPERTY_KEY));
+ return !isGeneric ? new HiveCatalogDatabase(properties,
hiveDatabase.getLocationUri(), hiveDatabase.getDescription()) :
+ new
GenericCatalogDatabase(retrieveFlinkProperties(properties),
hiveDatabase.getDescription());
}
@Override
- protected Database createHiveDatabase(String databaseName,
CatalogDatabase catalogDatabase) {
- HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase)
catalogDatabase;
+ public void createDatabase(String databaseName, CatalogDatabase
database, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName
cannot be null or empty");
+ checkNotNull(database, "database cannot be null");
+
+ Database hiveDatabase;
+ if (database instanceof HiveCatalogDatabase) {
+ hiveDatabase = instantiateHiveDatabase(databaseName,
(HiveCatalogDatabase) database);
+ } else if (database instanceof GenericCatalogDatabase) {
+ hiveDatabase = instantiateHiveDatabase(databaseName,
(GenericCatalogDatabase) database);
+ } else {
+ throw new CatalogException(String.format("Unsupported
catalog database type %s", database.getClass()), null);
+ }
- return new Database(
- databaseName,
- catalogDatabase.getComment(),
- hiveCatalogDatabase.getLocation(),
- hiveCatalogDatabase.getProperties());
+ createHiveDatabase(hiveDatabase, ignoreIfExists);
}
- // ------ tables and views------
+ private static Database instantiateHiveDatabase(String databaseName,
HiveCatalogDatabase database) {
+ return new Database(databaseName,
+ database.getComment(),
+ database.getLocation(),
+ database.getProperties());
+ }
+
+ private static Database instantiateHiveDatabase(String databaseName,
GenericCatalogDatabase database) {
+ // Add a property to make it as a generic catalog database
+ Map<String, String> properties = database.getProperties();
+
+ return new Database(databaseName,
+ database.getComment(),
+ // HDFS location URI which GenericCatalogDatabase
shouldn't care
+ null,
+ maskFlinkProperties(properties));
+ }
@Override
- protected void validateCatalogBaseTable(CatalogBaseTable table)
- throws CatalogException {
- if (!(table instanceof HiveCatalogTable) && !(table instanceof
HiveCatalogView)) {
+ public void alterDatabase(String databaseName, CatalogDatabase
newDatabase, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName
cannot be null or empty");
+ checkNotNull(newDatabase, "newDatabase cannot be null");
+
+ if (newDatabase instanceof HiveCatalogDatabase) {
+ alterHiveDatabase(databaseName,
instantiateHiveDatabase(databaseName, (HiveCatalogDatabase) newDatabase),
ignoreIfNotExists);
+ } else if (newDatabase instanceof GenericCatalogDatabase) {
+ alterHiveDatabase(databaseName,
instantiateHiveDatabase(databaseName, (GenericCatalogDatabase) newDatabase),
ignoreIfNotExists);
+ } else {
+ throw new CatalogException(String.format("Unsupported
catalog database type %s", newDatabase.getClass()), null);
+ }
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ try {
+ return client.getAllDatabases();
+ } catch (TException e) {
throw new CatalogException(
- "HiveCatalog can only operate on
HiveCatalogTable and HiveCatalogView.");
+ String.format("Failed to list all databases in
%s", catalogName), e);
}
}
@Override
- protected CatalogBaseTable createCatalogBaseTable(Table hiveTable) {
- // Table schema
- TableSchema tableSchema =
-
HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(),
hiveTable.getPartitionKeys());
+ public boolean databaseExists(String databaseName) throws
CatalogException {
+ try {
+ return client.getDatabase(databaseName) != null;
+ } catch (NoSuchObjectException e) {
+ return false;
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to determine whether
database %s exists or not", databaseName), e);
+ }
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists) throws
DatabaseNotExistException,
+ DatabaseNotEmptyException, CatalogException {
+ try {
+ client.dropDatabase(name, true, ignoreIfNotExists);
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new
DatabaseNotExistException(catalogName, name);
+ }
+ } catch (InvalidOperationException e) {
+ throw new DatabaseNotEmptyException(catalogName, name);
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to
drop database %s", name), e);
+ }
+ }
+
+ private Database getHiveDatabase(String databaseName) throws
DatabaseNotExistException {
+ try {
+ return client.getDatabase(databaseName);
+ } catch (NoSuchObjectException e) {
+ throw new DatabaseNotExistException(catalogName,
databaseName);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to get database %s from
%s", databaseName, catalogName), e);
+ }
+ }
+
+ private void createHiveDatabase(Database hiveDatabase, boolean
ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ try {
+ client.createDatabase(hiveDatabase);
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new
DatabaseAlreadyExistException(catalogName, hiveDatabase.getName());
+ }
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to
create database %s", hiveDatabase.getName()), e);
+ }
+ }
+
+ private void alterHiveDatabase(String name, Database newHiveDatabase,
boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ try {
+ if (databaseExists(name)) {
+ client.alterDatabase(name, newHiveDatabase);
+ } else if (!ignoreIfNotExists) {
+ throw new
DatabaseNotExistException(catalogName, name);
+ }
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to
alter database %s", name), e);
+ }
+ }
+
+ // ------ tables ------
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath) throws
TableNotExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+
+ Table hiveTable = getHiveTable(tablePath);
+ return instantiateHiveCatalogTable(hiveTable);
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
+ throws TableAlreadyExistException,
DatabaseNotExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ checkNotNull(table, "table cannot be null");
+
+ if (!databaseExists(tablePath.getDatabaseName())) {
+ throw new DatabaseNotExistException(catalogName,
tablePath.getDatabaseName());
+ }
+
+ Table hiveTable = instantiateHiveTable(tablePath, table);
+
+ try {
+ client.createTable(hiveTable);
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new
TableAlreadyExistException(catalogName, tablePath);
+ }
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to
create table %s", tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName,
boolean ignoreIfNotExists)
+ throws TableNotExistException,
TableAlreadyExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(newTableName), "newTableName
cannot be null or empty");
+
+ try {
+ // alter_table() doesn't throw a clear exception when
target table doesn't exist.
+ // Thus, check the table existence explicitly
+ if (tableExists(tablePath)) {
+ ObjectPath newPath = new
ObjectPath(tablePath.getDatabaseName(), newTableName);
+ // alter_table() doesn't throw a clear
exception when new table already exists.
+ // Thus, check the table existence explicitly
+ if (tableExists(newPath)) {
+ throw new
TableAlreadyExistException(catalogName, newPath);
+ } else {
+ Table table = getHiveTable(tablePath);
+ table.setTableName(newTableName);
+
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(),
table);
+ }
+ } else if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName,
tablePath);
+ }
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to rename table %s",
tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public void alterTable(ObjectPath tablePath, CatalogBaseTable
newCatalogTable, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+ checkNotNull(newCatalogTable, "newCatalogTable cannot be null");
+
+ if (!tableExists(tablePath)) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName,
tablePath);
+ }
+ return;
+ }
+
+ Table oldTable = getHiveTable(tablePath);
+ TableType oldTableType =
TableType.valueOf(oldTable.getTableType());
+
+ if (oldTableType == TableType.VIRTUAL_VIEW) {
+ if (!(newCatalogTable instanceof CatalogView)) {
+ throw new CatalogException(
+ String.format("Table types don't match.
The existing table is a view, but the new catalog base table is not."));
+ }
+ // Else, do nothing
+ } else if ((oldTableType == TableType.MANAGED_TABLE)) {
+ if (!(newCatalogTable instanceof CatalogTable)) {
+ throw new CatalogException(
+ String.format("Table types don't match.
The existing table is a table, but the new catalog base table is not."));
+ }
+ // Else, do nothing
+ } else {
+ throw new CatalogException(
+ String.format("Hive table type '%s' is not
supported yet.",
+ oldTableType.name()));
+ }
+
+ Table newTable = instantiateHiveTable(tablePath,
newCatalogTable);
+
+ // client.alter_table() requires a valid location
+ // thus, if new table doesn't have that, it reuses location of
the old table
+ if (!newTable.getSd().isSetLocation()) {
+
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+ }
+
+ try {
+ client.alter_table(tablePath.getDatabaseName(),
tablePath.getObjectName(), newTable);
+ } catch (TException e) {
+ throw new CatalogException(String.format("Failed to
rename table %s", tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+
+ try {
+ client.dropTable(
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName(),
+ // Indicate whether associated data should be
deleted.
+ // Set to 'true' for now because Flink tables
shouldn't have data in Hive. Can be changed later if necessary
+ true,
+ ignoreIfNotExists);
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName,
tablePath);
+ }
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to drop table %s",
tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) throws
DatabaseNotExistException, CatalogException {
+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName
cannot be null or empty");
+
+ try {
+ return client.getAllTables(databaseName);
+ } catch (UnknownDBException e) {
+ throw new DatabaseNotExistException(catalogName,
databaseName);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to list tables in
database %s", databaseName), e);
+ }
+ }
+
+ @Override
+ public List<String> listViews(String databaseName) throws
DatabaseNotExistException, CatalogException {
+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName
cannot be null or empty");
+
+ try {
+ return client.getTables(
+ databaseName,
+ null, // table pattern
+ TableType.VIRTUAL_VIEW);
+ } catch (UnknownDBException e) {
+ throw new DatabaseNotExistException(catalogName,
databaseName);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to list views in database
%s", databaseName), e);
+ }
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws
CatalogException {
+ checkNotNull(tablePath, "tablePath cannot be null");
+
+ try {
+ return client.tableExists(tablePath.getDatabaseName(),
tablePath.getObjectName());
+ } catch (UnknownDBException e) {
+ return false;
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to check whether table %s
exists or not.", tablePath.getFullName()), e);
+ }
+ }
+
+ private Table getHiveTable(ObjectPath tablePath) throws
TableNotExistException {
+ try {
+ return client.getTable(tablePath.getDatabaseName(),
tablePath.getObjectName());
+ } catch (NoSuchObjectException e) {
+ throw new TableNotExistException(catalogName,
tablePath);
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format("Failed to get table %s from Hive
metastore", tablePath.getFullName()), e);
+ }
+ }
+
+ private static CatalogBaseTable instantiateHiveCatalogTable(Table
hiveTable) {
+ boolean isView = TableType.valueOf(hiveTable.getTableType()) ==
TableType.VIRTUAL_VIEW;
// Table properties
Map<String, String> properties = hiveTable.getParameters();
-
- // Table comment
+ boolean isGeneric =
Boolean.valueOf(properties.get(GENERC_META_PROPERTY_KEY));
+ if (isGeneric) {
+ properties = retrieveFlinkProperties(properties);
+ }
String comment =
properties.remove(HiveTableConfig.TABLE_COMMENT);
+ // Table schema
+ TableSchema tableSchema =
+
HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(),
hiveTable.getPartitionKeys());
+
// Partition keys
List<String> partitionKeys = new ArrayList<>();
-
if (!hiveTable.getPartitionKeys().isEmpty()) {
- partitionKeys = hiveTable.getPartitionKeys().stream()
- .map(fs -> fs.getName())
- .collect(Collectors.toList());
+ partitionKeys =
hiveTable.getPartitionKeys().stream().map(fs ->
fs.getName()).collect(Collectors.toList());
}
- if (TableType.valueOf(hiveTable.getTableType()) ==
TableType.VIRTUAL_VIEW) {
- return new HiveCatalogView(
- hiveTable.getViewOriginalText(),
- hiveTable.getViewExpandedText(),
- tableSchema,
- properties,
- comment
- );
+ if (isView) {
+ if (isGeneric) {
+ return new GenericCatalogView(
+ hiveTable.getViewOriginalText(),
+ hiveTable.getViewExpandedText(),
+ tableSchema,
+ properties,
+ comment
+ );
+ } else {
+ return new HiveCatalogView(
+ hiveTable.getViewOriginalText(),
+ hiveTable.getViewExpandedText(),
+ tableSchema,
+ properties,
+ comment
+ );
+ }
} else {
- return new HiveCatalogTable(
- tableSchema, partitionKeys, properties,
comment);
+ if (isGeneric) {
+ return new GenericCatalogTable(tableSchema,
partitionKeys, properties, comment);
+ } else {
+ return new HiveCatalogTable(tableSchema,
partitionKeys, properties, comment);
+ }
}
}
- @Override
- protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable
table) {
- Map<String, String> properties = new
HashMap<>(table.getProperties());
-
- // Table comment
- properties.put(HiveTableConfig.TABLE_COMMENT,
table.getComment());
-
+ private static Table instantiateHiveTable(ObjectPath tablePath,
CatalogBaseTable table) {
Table hiveTable = new Table();
hiveTable.setDbName(tablePath.getDatabaseName());
hiveTable.setTableName(tablePath.getObjectName());
hiveTable.setCreateTime((int) (System.currentTimeMillis() /
1000));
+ Map<String, String> properties = new
HashMap<>(table.getProperties());
+ // Table comment
+ properties.put(HiveTableConfig.TABLE_COMMENT,
table.getComment());
+ if (table instanceof GenericCatalogTable || table instanceof
GenericCatalogView) {
+ properties = maskFlinkProperties(properties);
Review comment:
do we need to add the "flink.is_generic" key?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services