bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285392260
 
 

 ##########
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##########
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
        private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+       private static final String DEFAULT_DB = "default";
 
-       public HiveCatalog(String catalogName, String hivemetastoreURI) {
-               super(catalogName, hivemetastoreURI);
+       // Prefix used to distinguish properties created by Hive and Flink,
+       // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+       private static final String FLINK_PROPERTY_PREFIX = "flink.";
+       private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-               LOG.info("Created HiveCatalog '{}'", catalogName);
+       protected final String catalogName;
+       protected final HiveConf hiveConf;
+
+       private final String defaultDatabase;
+       protected IMetaStoreClient client;
+
+       public HiveCatalog(String catalogName, String hivemetastoreURI) {
+               this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
        }
 
        public HiveCatalog(String catalogName, HiveConf hiveConf) {
-               super(catalogName, hiveConf);
+               this(catalogName, DEFAULT_DB, hiveConf);
+       }
+
+       public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+               checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+               this.catalogName = catalogName;
+               this.defaultDatabase = defaultDatabase;
+               this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
                LOG.info("Created HiveCatalog '{}'", catalogName);
        }
 
+       private static HiveConf getHiveConf(String hiveMetastoreURI) {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+               HiveConf hiveConf = new HiveConf();
+               hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+               return hiveConf;
+       }
+
+       private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+               try {
+                       return RetryingMetaStoreClient.getProxy(
+                               hiveConf,
+                               null,
+                               null,
+                               HiveMetaStoreClient.class.getName(),
+                               true);
+               } catch (MetaException e) {
+                       throw new CatalogException("Failed to create Hive 
metastore client", e);
+               }
+       }
+
+       @Override
+       public void open() throws CatalogException {
+               if (client == null) {
+                       client = getMetastoreClient(hiveConf);
+                       LOG.info("Connected to Hive metastore");
+               }
+
+               if (!databaseExists(defaultDatabase)) {
+                       throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+                               defaultDatabase, catalogName));
+               }
+       }
+
+       @Override
+       public void close() throws CatalogException {
+               if (client != null) {
+                       client.close();
+                       client = null;
+                       LOG.info("Close connection to Hive metastore");
+               }
+       }
+
        // ------ databases ------
 
+       public String getDefaultDatabase() throws CatalogException {
+               return defaultDatabase;
+       }
+
        @Override
-       protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-               return new HiveCatalogDatabase(
-                       hiveDatabase.getParameters(),
-                       hiveDatabase.getLocationUri(),
-                       hiveDatabase.getDescription());
+       public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               Database hiveDatabase = getHiveDatabase(databaseName);
+
+               Map<String, String> properties = hiveDatabase.getParameters();
+               boolean isGeneric = 
Boolean.valueOf(properties.get(GENERC_META_PROPERTY_KEY));
+               return !isGeneric ? new HiveCatalogDatabase(properties, 
hiveDatabase.getLocationUri(), hiveDatabase.getDescription()) :
+                       new 
GenericCatalogDatabase(retrieveFlinkProperties(properties), 
hiveDatabase.getDescription());
        }
 
        @Override
-       protected Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
-               HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) 
catalogDatabase;
+       public void createDatabase(String databaseName, CatalogDatabase 
database, boolean ignoreIfExists)
+                       throws DatabaseAlreadyExistException, CatalogException {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName 
cannot be null or empty");
+               checkNotNull(database, "database cannot be null");
+
+               Database hiveDatabase;
+               if (database instanceof HiveCatalogDatabase) {
+                       hiveDatabase = instantiateHiveDatabase(databaseName, 
(HiveCatalogDatabase) database);
+               } else if (database instanceof GenericCatalogDatabase) {
+                       hiveDatabase = instantiateHiveDatabase(databaseName, 
(GenericCatalogDatabase) database);
+               } else {
+                       throw new CatalogException(String.format("Unsupported 
catalog database type %s", database.getClass()), null);
+               }
 
-               return new Database(
-                       databaseName,
-                       catalogDatabase.getComment(),
-                       hiveCatalogDatabase.getLocation(),
-                       hiveCatalogDatabase.getProperties());
+               createHiveDatabase(hiveDatabase, ignoreIfExists);
        }
 
-       // ------ tables and views------
+       private static Database instantiateHiveDatabase(String databaseName, 
HiveCatalogDatabase database) {
+               return new Database(databaseName,
+                       database.getComment(),
+                       database.getLocation(),
+                       database.getProperties());
+       }
+
+       private static Database instantiateHiveDatabase(String databaseName, 
GenericCatalogDatabase database) {
+               // Add a property to make it as a generic catalog database
 
 Review comment:
   where is the property added?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to