bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285404166
 
 

 ##########
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##########
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
        private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+       private static final String DEFAULT_DB = "default";
 
-       public HiveCatalog(String catalogName, String hivemetastoreURI) {
-               super(catalogName, hivemetastoreURI);
+       // Prefix used to distinguish properties created by Hive and Flink,
+       // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+       private static final String FLINK_PROPERTY_PREFIX = "flink.";
+       private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-               LOG.info("Created HiveCatalog '{}'", catalogName);
+       protected final String catalogName;
+       protected final HiveConf hiveConf;
+
+       private final String defaultDatabase;
+       protected IMetaStoreClient client;
+
+       public HiveCatalog(String catalogName, String hivemetastoreURI) {
+               this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
        }
 
        public HiveCatalog(String catalogName, HiveConf hiveConf) {
-               super(catalogName, hiveConf);
+               this(catalogName, DEFAULT_DB, hiveConf);
+       }
+
+       public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+               checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+               this.catalogName = catalogName;
+               this.defaultDatabase = defaultDatabase;
+               this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
                LOG.info("Created HiveCatalog '{}'", catalogName);
        }
 
+       private static HiveConf getHiveConf(String hiveMetastoreURI) {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+               HiveConf hiveConf = new HiveConf();
+               hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+               return hiveConf;
+       }
+
+       private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+               try {
+                       return RetryingMetaStoreClient.getProxy(
+                               hiveConf,
+                               null,
+                               null,
+                               HiveMetaStoreClient.class.getName(),
+                               true);
+               } catch (MetaException e) {
+                       throw new CatalogException("Failed to create Hive 
metastore client", e);
+               }
+       }
+
+       @Override
+       public void open() throws CatalogException {
+               if (client == null) {
+                       client = getMetastoreClient(hiveConf);
+                       LOG.info("Connected to Hive metastore");
+               }
+
+               if (!databaseExists(defaultDatabase)) {
+                       throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+                               defaultDatabase, catalogName));
+               }
+       }
+
+       @Override
+       public void close() throws CatalogException {
+               if (client != null) {
+                       client.close();
+                       client = null;
+                       LOG.info("Close connection to Hive metastore");
+               }
+       }
+
        // ------ databases ------
 
+       public String getDefaultDatabase() throws CatalogException {
+               return defaultDatabase;
+       }
+
        @Override
-       protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-               return new HiveCatalogDatabase(
-                       hiveDatabase.getParameters(),
-                       hiveDatabase.getLocationUri(),
-                       hiveDatabase.getDescription());
+       public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               Database hiveDatabase = getHiveDatabase(databaseName);
+
+               Map<String, String> properties = hiveDatabase.getParameters();
+               boolean isGeneric = 
Boolean.valueOf(properties.get(GENERC_META_PROPERTY_KEY));
+               return !isGeneric ? new HiveCatalogDatabase(properties, 
hiveDatabase.getLocationUri(), hiveDatabase.getDescription()) :
+                       new 
GenericCatalogDatabase(retrieveFlinkProperties(properties), 
hiveDatabase.getDescription());
        }
 
        @Override
-       protected Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
-               HiveCatalogDatabase hiveCatalogDatabase = (HiveCatalogDatabase) 
catalogDatabase;
+       public void createDatabase(String databaseName, CatalogDatabase 
database, boolean ignoreIfExists)
+                       throws DatabaseAlreadyExistException, CatalogException {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName 
cannot be null or empty");
+               checkNotNull(database, "database cannot be null");
+
+               Database hiveDatabase;
+               if (database instanceof HiveCatalogDatabase) {
+                       hiveDatabase = instantiateHiveDatabase(databaseName, 
(HiveCatalogDatabase) database);
+               } else if (database instanceof GenericCatalogDatabase) {
+                       hiveDatabase = instantiateHiveDatabase(databaseName, 
(GenericCatalogDatabase) database);
+               } else {
+                       throw new CatalogException(String.format("Unsupported 
catalog database type %s", database.getClass()), null);
+               }
 
-               return new Database(
-                       databaseName,
-                       catalogDatabase.getComment(),
-                       hiveCatalogDatabase.getLocation(),
-                       hiveCatalogDatabase.getProperties());
+               createHiveDatabase(hiveDatabase, ignoreIfExists);
        }
 
-       // ------ tables and views------
+       private static Database instantiateHiveDatabase(String databaseName, 
HiveCatalogDatabase database) {
+               return new Database(databaseName,
+                       database.getComment(),
+                       database.getLocation(),
+                       database.getProperties());
+       }
+
+       private static Database instantiateHiveDatabase(String databaseName, 
GenericCatalogDatabase database) {
+               // Add a property to make it as a generic catalog database
+               Map<String, String> properties = database.getProperties();
+
+               return new Database(databaseName,
+                       database.getComment(),
+                       // HDFS location URI which GenericCatalogDatabase 
shouldn't care
+                       null,
+                       maskFlinkProperties(properties));
+       }
 
        @Override
-       protected void validateCatalogBaseTable(CatalogBaseTable table)
-                       throws CatalogException {
-               if (!(table instanceof HiveCatalogTable) && !(table instanceof 
HiveCatalogView)) {
+       public void alterDatabase(String databaseName, CatalogDatabase 
newDatabase, boolean ignoreIfNotExists)
+                       throws DatabaseNotExistException, CatalogException {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName 
cannot be null or empty");
+               checkNotNull(newDatabase, "newDatabase cannot be null");
+
+               if (newDatabase instanceof HiveCatalogDatabase) {
+                       alterHiveDatabase(databaseName, 
instantiateHiveDatabase(databaseName, (HiveCatalogDatabase) newDatabase), 
ignoreIfNotExists);
+               } else if (newDatabase instanceof GenericCatalogDatabase) {
+                       alterHiveDatabase(databaseName, 
instantiateHiveDatabase(databaseName, (GenericCatalogDatabase) newDatabase), 
ignoreIfNotExists);
+               } else {
+                       throw new CatalogException(String.format("Unsupported 
catalog database type %s", newDatabase.getClass()), null);
+               }
+       }
+
+       @Override
+       public List<String> listDatabases() throws CatalogException {
+               try {
+                       return client.getAllDatabases();
+               } catch (TException e) {
                        throw new CatalogException(
-                               "HiveCatalog can only operate on 
HiveCatalogTable and HiveCatalogView.");
+                               String.format("Failed to list all databases in 
%s", catalogName), e);
                }
        }
 
        @Override
-       protected CatalogBaseTable createCatalogBaseTable(Table hiveTable) {
-               // Table schema
-               TableSchema tableSchema =
-                       
HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), 
hiveTable.getPartitionKeys());
+       public boolean databaseExists(String databaseName) throws 
CatalogException {
+               try {
+                       return client.getDatabase(databaseName) != null;
+               } catch (NoSuchObjectException e) {
+                       return false;
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to determine whether 
database %s exists or not", databaseName), e);
+               }
+       }
+
+       @Override
+       public void dropDatabase(String name, boolean ignoreIfNotExists) throws 
DatabaseNotExistException,
+                       DatabaseNotEmptyException, CatalogException {
+               try {
+                       client.dropDatabase(name, true, ignoreIfNotExists);
+               } catch (NoSuchObjectException e) {
+                       if (!ignoreIfNotExists) {
+                               throw new 
DatabaseNotExistException(catalogName, name);
+                       }
+               } catch (InvalidOperationException e) {
+                       throw new DatabaseNotEmptyException(catalogName, name);
+               } catch (TException e) {
+                       throw new CatalogException(String.format("Failed to 
drop database %s", name), e);
+               }
+       }
+
+       private Database getHiveDatabase(String databaseName) throws 
DatabaseNotExistException {
+               try {
+                       return client.getDatabase(databaseName);
+               } catch (NoSuchObjectException e) {
+                       throw new DatabaseNotExistException(catalogName, 
databaseName);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to get database %s from 
%s", databaseName, catalogName), e);
+               }
+       }
+
+       private void createHiveDatabase(Database hiveDatabase, boolean 
ignoreIfExists)
+                       throws DatabaseAlreadyExistException, CatalogException {
+               try {
+                       client.createDatabase(hiveDatabase);
+               } catch (AlreadyExistsException e) {
+                       if (!ignoreIfExists) {
+                               throw new 
DatabaseAlreadyExistException(catalogName, hiveDatabase.getName());
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(String.format("Failed to 
create database %s", hiveDatabase.getName()), e);
+               }
+       }
+
+       private void alterHiveDatabase(String name, Database newHiveDatabase, 
boolean ignoreIfNotExists)
+                       throws DatabaseNotExistException, CatalogException {
+               try {
+                       if (databaseExists(name)) {
+                               client.alterDatabase(name, newHiveDatabase);
+                       } else if (!ignoreIfNotExists) {
+                               throw new 
DatabaseNotExistException(catalogName, name);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
+               }
+       }
+
+       // ------ tables ------
+
+       @Override
+       public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
+               checkNotNull(tablePath, "tablePath cannot be null");
+
+               Table hiveTable = getHiveTable(tablePath);
+               return instantiateHiveCatalogTable(hiveTable);
+       }
+
+       @Override
+       public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+                       throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+               checkNotNull(tablePath, "tablePath cannot be null");
+               checkNotNull(table, "table cannot be null");
+
+               if (!databaseExists(tablePath.getDatabaseName())) {
+                       throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+               }
+
+               Table hiveTable = instantiateHiveTable(tablePath, table);
+
+               try {
+                       client.createTable(hiveTable);
+               } catch (AlreadyExistsException e) {
+                       if (!ignoreIfExists) {
+                               throw new 
TableAlreadyExistException(catalogName, tablePath);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(String.format("Failed to 
create table %s", tablePath.getFullName()), e);
+               }
+       }
+
+       @Override
+       public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+                       throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+               checkNotNull(tablePath, "tablePath cannot be null");
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(newTableName), "newTableName 
cannot be null or empty");
+
+               try {
+                       // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+                       // Thus, check the table existence explicitly
+                       if (tableExists(tablePath)) {
+                               ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+                               // alter_table() doesn't throw a clear 
exception when new table already exists.
+                               // Thus, check the table existence explicitly
+                               if (tableExists(newPath)) {
+                                       throw new 
TableAlreadyExistException(catalogName, newPath);
+                               } else {
+                                       Table table = getHiveTable(tablePath);
+                                       table.setTableName(newTableName);
+                                       
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+                               }
+                       } else if (!ignoreIfNotExists) {
+                               throw new TableNotExistException(catalogName, 
tablePath);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+               }
+       }
+
+       @Override
+       public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+                       throws TableNotExistException, CatalogException {
+               checkNotNull(tablePath, "tablePath cannot be null");
+               checkNotNull(newCatalogTable, "newCatalogTable cannot be null");
+
+               if (!tableExists(tablePath)) {
+                       if (!ignoreIfNotExists) {
+                               throw new TableNotExistException(catalogName, 
tablePath);
+                       }
+                       return;
+               }
+
+               Table oldTable = getHiveTable(tablePath);
+               TableType oldTableType = 
TableType.valueOf(oldTable.getTableType());
+
+               if (oldTableType == TableType.VIRTUAL_VIEW) {
+                       if (!(newCatalogTable instanceof CatalogView)) {
+                               throw new CatalogException(
+                                       String.format("Table types don't match. 
The existing table is a view, but the new catalog base table is not."));
+                       }
+                       // Else, do nothing
+               } else if ((oldTableType == TableType.MANAGED_TABLE)) {
+                       if (!(newCatalogTable instanceof CatalogTable)) {
+                               throw new CatalogException(
+                                       String.format("Table types don't match. 
The existing table is a table, but the new catalog base table is not."));
+                       }
+                       // Else, do nothing
+               } else {
+                       throw new CatalogException(
+                               String.format("Hive table type '%s' is not 
supported yet.",
+                                       oldTableType.name()));
+               }
+
+               Table newTable = instantiateHiveTable(tablePath, 
newCatalogTable);
+
+               // client.alter_table() requires a valid location
+               // thus, if new table doesn't have that, it reuses location of 
the old table
+               if (!newTable.getSd().isSetLocation()) {
+                       
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+               }
+
+               try {
+                       client.alter_table(tablePath.getDatabaseName(), 
tablePath.getObjectName(), newTable);
+               } catch (TException e) {
+                       throw new CatalogException(String.format("Failed to 
rename table %s", tablePath.getFullName()), e);
+               }
+       }
+
+       @Override
+       public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) 
throws TableNotExistException, CatalogException {
+               checkNotNull(tablePath, "tablePath cannot be null");
+
+               try {
+                       client.dropTable(
+                               tablePath.getDatabaseName(),
+                               tablePath.getObjectName(),
+                               // Indicate whether associated data should be 
deleted.
+                               // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
+                               true,
+                               ignoreIfNotExists);
+               } catch (NoSuchObjectException e) {
+                       if (!ignoreIfNotExists) {
+                               throw new TableNotExistException(catalogName, 
tablePath);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to drop table %s", 
tablePath.getFullName()), e);
+               }
+       }
+
+       @Override
+       public List<String> listTables(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName 
cannot be null or empty");
+
+               try {
+                       return client.getAllTables(databaseName);
+               } catch (UnknownDBException e) {
+                       throw new DatabaseNotExistException(catalogName, 
databaseName);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to list tables in 
database %s", databaseName), e);
+               }
+       }
+
+       @Override
+       public List<String> listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName 
cannot be null or empty");
+
+               try {
+                       return client.getTables(
+                               databaseName,
+                               null, // table pattern
+                               TableType.VIRTUAL_VIEW);
+               } catch (UnknownDBException e) {
+                       throw new DatabaseNotExistException(catalogName, 
databaseName);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to list views in database 
%s", databaseName), e);
+               }
+       }
+
+       @Override
+       public boolean tableExists(ObjectPath tablePath) throws 
CatalogException {
+               checkNotNull(tablePath, "tablePath cannot be null");
+
+               try {
+                       return client.tableExists(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+               } catch (UnknownDBException e) {
+                       return false;
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to check whether table %s 
exists or not.", tablePath.getFullName()), e);
+               }
+       }
+
+       private Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
+               try {
+                       return client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+               } catch (NoSuchObjectException e) {
+                       throw new TableNotExistException(catalogName, 
tablePath);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to get table %s from Hive 
metastore", tablePath.getFullName()), e);
+               }
+       }
+
+       private static CatalogBaseTable instantiateHiveCatalogTable(Table 
hiveTable) {
+               boolean isView = TableType.valueOf(hiveTable.getTableType()) == 
TableType.VIRTUAL_VIEW;
 
                // Table properties
                Map<String, String> properties = hiveTable.getParameters();
-
-               // Table comment
+               boolean isGeneric = 
Boolean.valueOf(properties.get(GENERC_META_PROPERTY_KEY));
+               if (isGeneric) {
+                       properties = retrieveFlinkProperties(properties);
+               }
                String comment = 
properties.remove(HiveTableConfig.TABLE_COMMENT);
 
+               // Table schema
+               TableSchema tableSchema =
+                       
HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), 
hiveTable.getPartitionKeys());
+
                // Partition keys
                List<String> partitionKeys = new ArrayList<>();
-
                if (!hiveTable.getPartitionKeys().isEmpty()) {
-                       partitionKeys = hiveTable.getPartitionKeys().stream()
-                               .map(fs -> fs.getName())
-                               .collect(Collectors.toList());
+                       partitionKeys = 
hiveTable.getPartitionKeys().stream().map(fs -> 
fs.getName()).collect(Collectors.toList());
                }
 
-               if (TableType.valueOf(hiveTable.getTableType()) == 
TableType.VIRTUAL_VIEW) {
-                       return new HiveCatalogView(
-                               hiveTable.getViewOriginalText(),
-                               hiveTable.getViewExpandedText(),
-                               tableSchema,
-                               properties,
-                               comment
-                       );
+               if (isView) {
+                       if (isGeneric) {
+                               return new GenericCatalogView(
+                                       hiveTable.getViewOriginalText(),
+                                       hiveTable.getViewExpandedText(),
+                                       tableSchema,
+                                       properties,
+                                       comment
+                               );
+                       } else {
+                               return new HiveCatalogView(
+                                       hiveTable.getViewOriginalText(),
+                                       hiveTable.getViewExpandedText(),
+                                       tableSchema,
+                                       properties,
+                                       comment
+                               );
+                       }
                } else {
-                       return new HiveCatalogTable(
-                               tableSchema, partitionKeys, properties, 
comment);
+                       if (isGeneric) {
+                               return new GenericCatalogTable(tableSchema, 
partitionKeys, properties, comment);
+                       } else {
+                               return new HiveCatalogTable(tableSchema, 
partitionKeys, properties, comment);
+                       }
                }
        }
 
-       @Override
-       protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable 
table) {
-               Map<String, String> properties = new 
HashMap<>(table.getProperties());
-
-               // Table comment
-               properties.put(HiveTableConfig.TABLE_COMMENT, 
table.getComment());
-
+       private  static Table instantiateHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
                Table hiveTable = new Table();
 
 Review comment:
   shall we verify `table` is of types among `GenericCatalogTable/View` and 
`HiveCatalogTable/View`, similar to how database is verified?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to