bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]:
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285670976
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
##########
@@ -54,117 +76,491 @@
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A catalog implementation for Hive.
*/
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG =
LoggerFactory.getLogger(HiveCatalog.class);
+ private static final String DEFAULT_DB = "default";
- public HiveCatalog(String catalogName, String hivemetastoreURI) {
- super(catalogName, hivemetastoreURI);
+ // Prefix used to distinguish properties created by Hive and Flink,
+ // as Hive metastore has its own properties created upon table creation
and migration between different versions of metastore.
+ private static final String FLINK_PROPERTY_PREFIX = "flink.";
+ private static final String FLINK_PROPERTY_IS_GENERIC =
FLINK_PROPERTY_PREFIX + GenericInMemoryCatalog.FLINK_IS_GENERIC_KEY;
- LOG.info("Created HiveCatalog '{}'", catalogName);
+ protected final String catalogName;
+ protected final HiveConf hiveConf;
+
+ private final String defaultDatabase;
+ protected IMetaStoreClient client;
+
+ public HiveCatalog(String catalogName, String hivemetastoreURI) {
+ this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
public HiveCatalog(String catalogName, HiveConf hiveConf) {
- super(catalogName, hiveConf);
+ this(catalogName, DEFAULT_DB, hiveConf);
+ }
+
+ public HiveCatalog(String catalogName, String defaultDatabase, HiveConf
hiveConf) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName),
"catalogName cannot be null or empty");
+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase),
"defaultDatabase cannot be null or empty");
+ this.catalogName = catalogName;
+ this.defaultDatabase = defaultDatabase;
+ this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be
null");
LOG.info("Created HiveCatalog '{}'", catalogName);
}
+ private static HiveConf getHiveConf(String hiveMetastoreURI) {
+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI),
"hiveMetastoreURI cannot be null or empty");
+
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,
hiveMetastoreURI);
+ return hiveConf;
+ }
+
+ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+ try {
+ return RetryingMetaStoreClient.getProxy(
+ hiveConf,
+ null,
+ null,
+ HiveMetaStoreClient.class.getName(),
+ true);
+ } catch (MetaException e) {
+ throw new CatalogException("Failed to create Hive
metastore client", e);
+ }
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ if (client == null) {
+ client = getMetastoreClient(hiveConf);
+ LOG.info("Connected to Hive metastore");
+ }
+
+ if (!databaseExists(defaultDatabase)) {
+ throw new CatalogException(String.format("Configured
default database %s doesn't exist in catalog %s.",
+ defaultDatabase, catalogName));
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (client != null) {
+ client.close();
+ client = null;
+ LOG.info("Close connection to Hive metastore");
+ }
+ }
+
// ------ databases ------
+ public String getDefaultDatabase() throws CatalogException {
+ return defaultDatabase;
+ }
+
@Override
- protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
- return new HiveCatalogDatabase(
- hiveDatabase.getParameters(),
- hiveDatabase.getLocationUri(),
- hiveDatabase.getDescription());
+ public CatalogDatabase getDatabase(String databaseName) throws
DatabaseNotExistException, CatalogException {
+ Database hiveDatabase = getHiveDatabase(databaseName);
+
+ Map<String, String> properties = hiveDatabase.getParameters();
+ boolean isGeneric =
Boolean.valueOf(properties.get(FLINK_PROPERTY_IS_GENERIC));
+ return !isGeneric ? new HiveCatalogDatabase(properties,
hiveDatabase.getLocationUri(), hiveDatabase.getDescription()) :
+ new
GenericCatalogDatabase(retrieveFlinkProperties(properties),
hiveDatabase.getDescription());
+ }
+
+ @Override
+ public void createDatabase(String databaseName, CatalogDatabase
database, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName
cannot be null or empty");
+ checkNotNull(database, "database cannot be null");
+
+ Database hiveDatabase;
+ if (database instanceof HiveCatalogDatabase) {
+ hiveDatabase = instantiateHiveDatabase(databaseName,
(HiveCatalogDatabase) database);
+ } else if (database instanceof GenericCatalogDatabase) {
+ hiveDatabase = instantiateHiveDatabase(databaseName,
(GenericCatalogDatabase) database);
+ } else {
+ throw new CatalogException(String.format("Unsupported
catalog database type %s", database.getClass()), null);
+ }
+
+ createHiveDatabase(hiveDatabase, ignoreIfExists);
Review comment:
can we remove `createHiveDatabase()` and put its logic here given that no
one else is using `createHiveDatabase()`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services