This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f919b23  [FLINK-12233][hive] Support table related operations in 
HiveCatalog
f919b23 is described below

commit f919b23890913aa8d3f6241a1d70ef82006863fc
Author: bowen.li <[email protected]>
AuthorDate: Tue May 7 11:52:17 2019 -0700

    [FLINK-12233][hive] Support table related operations in HiveCatalog
    
    This PR introduced HiveCatalogTable and implemented table related catalog 
APIs in HiveCatalog.
    
    This closes #8353.
---
 .../catalog/hive/GenericHiveMetastoreCatalog.java  | 238 ++++++++++-----------
 .../flink/table/catalog/hive/HiveCatalog.java      | 110 +++++++---
 .../flink/table/catalog/hive/HiveCatalogBase.java  | 190 ++++++++++++++++
 .../flink/table/catalog/hive/HiveCatalogTable.java | 105 +++++++++
 .../flink/table/catalog/hive/HiveCatalogUtil.java  |   1 -
 .../flink/table/catalog/hive/HiveTableConfig.java  |   4 +-
 .../hive/util/GenericHiveMetastoreCatalogUtil.java | 171 ---------------
 .../HiveTableUtil.java}                            |  31 ++-
 .../hive/GenericHiveMetastoreCatalogTest.java      |   3 +-
 .../flink/table/catalog/hive/HiveCatalogTest.java  | 110 ++++------
 .../flink/table/catalog/hive/HiveTestUtils.java    |   7 +
 .../table/catalog/GenericInMemoryCatalogTest.java  |   4 +-
 .../flink/table/catalog/CatalogTestBase.java       |  33 ++-
 .../flink/table/catalog/CatalogTestUtil.java       |  13 +-
 14 files changed, 594 insertions(+), 426 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
index 843b4a2..c6d14f4 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
@@ -18,12 +18,17 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogView;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
@@ -33,25 +38,28 @@ import 
org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import 
org.apache.flink.table.catalog.hive.util.GenericHiveMetastoreCatalogUtil;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * A catalog that persists all Flink streaming and batch metadata by using 
Hive metastore as a persistent storage.
@@ -59,6 +67,10 @@ import java.util.List;
 public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
        private static final Logger LOG = 
LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class);
 
+       // Prefix used to distinguish properties created by Hive and Flink,
+       // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+       private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
        public GenericHiveMetastoreCatalog(String catalogName, String 
hivemetastoreURI) {
                super(catalogName, hivemetastoreURI);
 
@@ -95,144 +107,103 @@ public class GenericHiveMetastoreCatalog extends 
HiveCatalogBase {
        // ------ tables and views------
 
        @Override
-       public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-                       throws TableNotExistException, CatalogException {
-               try {
-                       client.dropTable(
-                               tablePath.getDatabaseName(),
-                               tablePath.getObjectName(),
-                               // Indicate whether associated data should be 
deleted.
-                               // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
-                               true,
-                               ignoreIfNotExists);
-               } catch (NoSuchObjectException e) {
-                       if (!ignoreIfNotExists) {
-                               throw new TableNotExistException(catalogName, 
tablePath);
-                       }
-               } catch (TException e) {
+       protected void validateCatalogBaseTable(CatalogBaseTable table)
+                       throws CatalogException {
+               if (!(table instanceof GenericCatalogTable) && !(table 
instanceof GenericCatalogView)) {
                        throw new CatalogException(
-                               String.format("Failed to drop table %s", 
tablePath.getFullName()), e);
+                               "GenericHiveMetastoreCatalog can only operate 
on GenericCatalogTable and GenericCatalogView.");
                }
        }
 
        @Override
-       public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-                       throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
-               try {
-                       // alter_table() doesn't throw a clear exception when 
target table doesn't exist. Thus, check the table existence explicitly
-                       if (tableExists(tablePath)) {
-                               ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
-                               // alter_table() doesn't throw a clear 
exception when new table already exists. Thus, check the table existence 
explicitly
-                               if (tableExists(newPath)) {
-                                       throw new 
TableAlreadyExistException(catalogName, newPath);
-                               } else {
-                                       Table table = getHiveTable(tablePath);
-                                       table.setTableName(newTableName);
-                                       
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
-                               }
-                       } else if (!ignoreIfNotExists) {
-                               throw new TableNotExistException(catalogName, 
tablePath);
-                       }
-               } catch (TException e) {
-                       throw new CatalogException(
-                               String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
-               }
-       }
+       protected CatalogBaseTable createCatalogBaseTable(Table hiveTable) {
+               // Table schema
+               TableSchema tableSchema = HiveTableUtil.createTableSchema(
+                       hiveTable.getSd().getCols(), 
hiveTable.getPartitionKeys());
 
-       @Override
-       public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-                       throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
-               if (!databaseExists(tablePath.getDatabaseName())) {
-                       throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
-               } else {
-                       try {
-                               
client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath, 
table));
-                       } catch (AlreadyExistsException e) {
-                               if (!ignoreIfExists) {
-                                       throw new 
TableAlreadyExistException(catalogName, tablePath);
-                               }
-                       } catch (TException e) {
-                               throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
-                       }
+               // Table properties
+               Map<String, String> properties = 
retrieveFlinkProperties(hiveTable.getParameters());
+
+               // Table comment
+               String comment = 
properties.remove(HiveTableConfig.TABLE_COMMENT);
+
+               // Partition keys
+               List<String> partitionKeys = new ArrayList<>();
+
+               if (!hiveTable.getPartitionKeys().isEmpty()) {
+                       partitionKeys = hiveTable.getPartitionKeys().stream()
+                               .map(fs -> fs.getName())
+                               .collect(Collectors.toList());
                }
-       }
 
-       @Override
-       public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-                       throws TableNotExistException, CatalogException {
-               if (!tableExists(tablePath)) {
-                       if (!ignoreIfNotExists) {
-                               throw new TableNotExistException(catalogName, 
tablePath);
-                       }
+               if (TableType.valueOf(hiveTable.getTableType()) == 
TableType.VIRTUAL_VIEW) {
+                       return new GenericCatalogView(
+                               hiveTable.getViewOriginalText(),
+                               hiveTable.getViewExpandedText(),
+                               tableSchema,
+                               properties,
+                               comment
+                       );
                } else {
-                       // IMetastoreClient.alter_table() requires the table to 
have a valid location, which it doesn't in this case
-                       // Thus we have to translate alterTable() into 
(dropTable() + createTable())
-                       dropTable(tablePath, false);
-                       try {
-                               createTable(tablePath, newTable, false);
-                       } catch (TableAlreadyExistException | 
DatabaseNotExistException e) {
-                               // These exceptions wouldn't be thrown, unless 
a concurrent operation is triggered in Hive
-                               throw new CatalogException(
-                                       String.format("Failed to alter table 
%s", tablePath), e);
-                       }
+                       return new GenericCatalogTable(
+                               tableSchema, partitionKeys, properties, 
comment);
                }
        }
 
        @Override
-       public List<String> listTables(String databaseName) throws 
DatabaseNotExistException, CatalogException {
-               try {
-                       return client.getAllTables(databaseName);
-               } catch (UnknownDBException e) {
-                       throw new DatabaseNotExistException(catalogName, 
databaseName);
-               } catch (TException e) {
-                       throw new CatalogException(
-                               String.format("Failed to list tables in 
database %s", databaseName), e);
-               }
-       }
+       protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable 
table) {
+               Map<String, String> properties = new 
HashMap<>(table.getProperties());
 
-       @Override
-       public List<String> listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
-               try {
-                       return client.getTables(
-                               databaseName,
-                               null, // table pattern
-                               TableType.VIRTUAL_VIEW);
-               } catch (UnknownDBException e) {
-                       throw new DatabaseNotExistException(catalogName, 
databaseName);
-               } catch (TException e) {
-                       throw new CatalogException(
-                               String.format("Failed to list views in database 
%s", databaseName), e);
-               }
-       }
+               // Table comment
+               properties.put(HiveTableConfig.TABLE_COMMENT, 
table.getComment());
 
-       @Override
-       public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
-               Table hiveTable = getHiveTable(tablePath);
+               Table hiveTable = new Table();
+               hiveTable.setDbName(tablePath.getDatabaseName());
+               hiveTable.setTableName(tablePath.getObjectName());
+               hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
 
-               return 
GenericHiveMetastoreCatalogUtil.createCatalogTable(hiveTable);
-       }
+               // Table properties
+               hiveTable.setParameters(maskFlinkProperties(properties));
 
-       protected Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
-               try {
-                       return client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
-               } catch (NoSuchObjectException e) {
-                       throw new TableNotExistException(catalogName, 
tablePath);
-               } catch (TException e) {
-                       throw new CatalogException(
-                               String.format("Failed to get table %s from Hive 
metastore", tablePath.getFullName()), e);
-               }
-       }
+               // Hive table's StorageDescriptor
+               StorageDescriptor sd = new StorageDescriptor();
+               sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
 
-       @Override
-       public boolean tableExists(ObjectPath tablePath) throws 
CatalogException {
-               try {
-                       return client.tableExists(tablePath.getDatabaseName(), 
tablePath.getObjectName());
-               } catch (UnknownDBException e) {
-                       return false;
-               } catch (TException e) {
-                       throw new CatalogException(
-                               String.format("Failed to check whether table %s 
exists or not.", tablePath.getFullName()), e);
+               List<FieldSchema> allColumns = 
HiveTableUtil.createHiveColumns(table.getSchema());
+
+               // Table columns and partition keys
+               if (table instanceof CatalogTable) {
+                       CatalogTable catalogTable = (CatalogTable) table;
+
+                       if (catalogTable.isPartitioned()) {
+                               int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+                               List<FieldSchema> regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+                               List<FieldSchema> partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+                               sd.setCols(regularColumns);
+                               hiveTable.setPartitionKeys(partitionColumns);
+                       } else {
+                               sd.setCols(allColumns);
+                               hiveTable.setPartitionKeys(new ArrayList<>());
+                       }
+               } else if (table instanceof CatalogView) {
+                       CatalogView view = (CatalogView) table;
+
+                       // TODO: [FLINK-12398] Support partitioned view in 
catalog API
+                       sd.setCols(allColumns);
+                       hiveTable.setPartitionKeys(new ArrayList<>());
+
+                       hiveTable.setViewOriginalText(view.getOriginalQuery());
+                       hiveTable.setViewExpandedText(view.getExpandedQuery());
+                       hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
+               } else {
+                       throw new IllegalArgumentException(
+                               "GenericHiveMetastoreCatalog only supports 
CatalogTable and CatalogView");
                }
+
+               hiveTable.setSd(sd);
+
+               return hiveTable;
        }
 
        // ------ partitions ------
@@ -363,4 +334,23 @@ public class GenericHiveMetastoreCatalog extends 
HiveCatalogBase {
                throw new UnsupportedOperationException();
        }
 
+       // ------ utils ------
+
+       /**
+        * Filter out Hive-created properties, and return Flink-created 
properties.
+        */
+       private static Map<String, String> retrieveFlinkProperties(Map<String, 
String> hiveTableParams) {
+               return hiveTableParams.entrySet().stream()
+                       .filter(e -> 
e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+                       .collect(Collectors.toMap(e -> 
e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
+       }
+
+       /**
+        * Add a prefix to Flink-created properties to distinguish them from 
Hive-created properties.
+        */
+       private static Map<String, String> maskFlinkProperties(Map<String, 
String> properties) {
+               return properties.entrySet().stream()
+                       .filter(e -> e.getKey() != null && e.getValue() != null)
+                       .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + 
e.getKey(), e -> e.getValue()));
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index e1c2c30..5ec6fd8 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
@@ -32,18 +34,26 @@ import 
org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * A catalog implementation for Hive.
@@ -88,48 +98,84 @@ public class HiveCatalog extends HiveCatalogBase {
        // ------ tables and views------
 
        @Override
-       public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-                       throws TableNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
+       protected void validateCatalogBaseTable(CatalogBaseTable table)
+                       throws CatalogException {
+               // TODO: validate HiveCatalogView
+               if (!(table instanceof HiveCatalogTable)) {
+                       throw new CatalogException(
+                               "HiveCatalog can only operate on 
HiveCatalogTable and HiveCatalogView.");
+               }
        }
 
        @Override
-       public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-                       throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
-               throw new UnsupportedOperationException();
-       }
+       protected CatalogBaseTable createCatalogBaseTable(Table hiveTable) {
+               // Table schema
+               TableSchema tableSchema =
+                       
HiveTableUtil.createTableSchema(hiveTable.getSd().getCols(), 
hiveTable.getPartitionKeys());
 
-       @Override
-       public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-                       throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
-       }
+               // Table properties
+               Map<String, String> properties = hiveTable.getParameters();
 
-       @Override
-       public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-                       throws TableNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
-       }
+               // Table comment
+               String comment = 
properties.remove(HiveTableConfig.TABLE_COMMENT);
 
-       @Override
-       public List<String> listTables(String databaseName)
-                       throws DatabaseNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
-       }
+               // Partition keys
+               List<String> partitionKeys = new ArrayList<>();
 
-       @Override
-       public List<String> listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
-       }
+               if (!hiveTable.getPartitionKeys().isEmpty()) {
+                       partitionKeys = hiveTable.getPartitionKeys().stream()
+                               .map(fs -> fs.getName())
+                               .collect(Collectors.toList());
+               }
 
-       @Override
-       public CatalogBaseTable getTable(ObjectPath objectPath) throws 
TableNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
+               return new HiveCatalogTable(tableSchema, partitionKeys, 
properties, comment);
        }
 
        @Override
-       public boolean tableExists(ObjectPath objectPath) throws 
CatalogException {
-               throw new UnsupportedOperationException();
+       protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable 
table) {
+               Map<String, String> properties = new 
HashMap<>(table.getProperties());
+
+               // Table comment
+               properties.put(HiveTableConfig.TABLE_COMMENT, 
table.getComment());
+
+               Table hiveTable = new Table();
+               hiveTable.setDbName(tablePath.getDatabaseName());
+               hiveTable.setTableName(tablePath.getObjectName());
+               hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+               // Table properties
+               hiveTable.setParameters(properties);
+
+               // Hive table's StorageDescriptor
+               // TODO: This is very basic Hive table.
+               //  [FLINK-11479] Add input/output format and SerDeLib 
information for Hive tables in HiveCatalogUtil#createHiveTable
+               StorageDescriptor sd = new StorageDescriptor();
+               sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+               List<FieldSchema> allColumns = 
HiveTableUtil.createHiveColumns(table.getSchema());
+
+               // Table columns and partition keys
+               if (table instanceof CatalogTable) {
+                       HiveCatalogTable catalogTable = (HiveCatalogTable) 
table;
+
+                       if (catalogTable.isPartitioned()) {
+                               int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+                               List<FieldSchema> regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+                               List<FieldSchema> partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+                               sd.setCols(regularColumns);
+                               hiveTable.setPartitionKeys(partitionColumns);
+                       } else {
+                               sd.setCols(allColumns);
+                               hiveTable.setPartitionKeys(new ArrayList<>());
+                       }
+               } else {
+                       throw new UnsupportedOperationException("HiveCatalog 
doesn't support view yet");
+               }
+
+               hiveTable.setSd(sd);
+
+               return hiveTable;
        }
 
        // ------ partitions ------
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
index 7c2fdcc..5c44ebc 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
@@ -19,21 +19,28 @@
 package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,6 +96,34 @@ public abstract class HiveCatalogBase implements Catalog {
                }
        }
 
+       // ------ APIs ------
+
+       /**
+        * Validate input base table.
+        *
+        * @param catalogBaseTable the base table to be validated
+        * @throws CatalogException thrown if the input base table is invalid.
+        */
+       protected abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
+               throws CatalogException;
+
+       /**
+        * Create a CatalogBaseTable from a Hive table.
+        *
+        * @param hiveTable a Hive table
+        * @return a CatalogBaseTable
+        */
+       protected abstract CatalogBaseTable createCatalogBaseTable(Table 
hiveTable);
+
+       /**
+        * Create a Hive table from a CatalogBaseTable.
+        *
+        * @param tablePath path of the table
+        * @param table a CatalogBaseTable
+        * @return a Hive table
+        */
+       protected abstract Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table);
+
        @Override
        public void open() throws CatalogException {
                if (client == null) {
@@ -197,4 +232,159 @@ public abstract class HiveCatalogBase implements Catalog {
                        throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
                }
        }
+
+       // ------ tables ------
+
+       @Override
+       public CatalogBaseTable getTable(ObjectPath tablePath)
+                       throws TableNotExistException, CatalogException {
+               return createCatalogBaseTable(getHiveTable(tablePath));
+       }
+
+       @Override
+       public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+                       throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+               validateCatalogBaseTable(table);
+
+               if (!databaseExists(tablePath.getDatabaseName())) {
+                       throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+               } else {
+                       try {
+                               client.createTable(createHiveTable(tablePath, 
table));
+                       } catch (AlreadyExistsException e) {
+                               if (!ignoreIfExists) {
+                                       throw new 
TableAlreadyExistException(catalogName, tablePath);
+                               }
+                       } catch (TException e) {
+                               throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+                       }
+               }
+       }
+
+       @Override
+       public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+                       throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+               try {
+                       // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+                       // Thus, check the table existence explicitly
+                       if (tableExists(tablePath)) {
+                               ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+                               // alter_table() doesn't throw a clear 
exception when new table already exists.
+                               // Thus, check the table existence explicitly
+                               if (tableExists(newPath)) {
+                                       throw new 
TableAlreadyExistException(catalogName, newPath);
+                               } else {
+                                       Table table = getHiveTable(tablePath);
+                                       table.setTableName(newTableName);
+                                       
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+                               }
+                       } else if (!ignoreIfNotExists) {
+                               throw new TableNotExistException(catalogName, 
tablePath);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+               }
+       }
+
+       @Override
+       public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+                       throws TableNotExistException, CatalogException {
+               validateCatalogBaseTable(newCatalogTable);
+
+               try {
+                       if (!tableExists(tablePath)) {
+                               if (!ignoreIfNotExists) {
+                                       throw new 
TableNotExistException(catalogName, tablePath);
+                               }
+                       } else {
+                               // TODO: [FLINK-12452] alterTable() in all 
catalogs should ensure existing base table and the new one are of the same type
+                               Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+                               // client.alter_table() requires a valid 
location
+                               // thus, if new table doesn't have that, it 
reuses location of the old table
+                               if (!newTable.getSd().isSetLocation()) {
+                                       Table oldTable = 
getHiveTable(tablePath);
+                                       
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+                               }
+
+                               client.alter_table(tablePath.getDatabaseName(), 
tablePath.getObjectName(), newTable);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+               }
+       }
+
+       @Override
+       public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+                       throws TableNotExistException, CatalogException {
+               try {
+                       client.dropTable(
+                               tablePath.getDatabaseName(),
+                               tablePath.getObjectName(),
+                               // Indicate whether associated data should be 
deleted.
+                               // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
+                               true,
+                               ignoreIfNotExists);
+               } catch (NoSuchObjectException e) {
+                       if (!ignoreIfNotExists) {
+                               throw new TableNotExistException(catalogName, 
tablePath);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to drop table %s", 
tablePath.getFullName()), e);
+               }
+       }
+
+       @Override
+       public List<String> listTables(String databaseName)
+                       throws DatabaseNotExistException, CatalogException {
+               try {
+                       return client.getAllTables(databaseName);
+               } catch (UnknownDBException e) {
+                       throw new DatabaseNotExistException(catalogName, 
databaseName);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to list tables in 
database %s", databaseName), e);
+               }
+       }
+
+       @Override
+       public List<String> listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+               try {
+                       return client.getTables(
+                               databaseName,
+                               null, // table pattern
+                               TableType.VIRTUAL_VIEW);
+               } catch (UnknownDBException e) {
+                       throw new DatabaseNotExistException(catalogName, 
databaseName);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to list views in database 
%s", databaseName), e);
+               }
+       }
+
+       @Override
+       public boolean tableExists(ObjectPath tablePath) throws 
CatalogException {
+               try {
+                       return client.tableExists(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+               } catch (UnknownDBException e) {
+                       return false;
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to check whether table %s 
exists or not.", tablePath.getFullName()), e);
+               }
+       }
+
+       private Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
+               try {
+                       return client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+               } catch (NoSuchObjectException e) {
+                       throw new TableNotExistException(catalogName, 
tablePath);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to get table %s from Hive 
metastore", tablePath.getFullName()), e);
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
new file mode 100644
index 0000000..87fb413
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A Hive catalog table implementation.
+ */
+public class HiveCatalogTable implements CatalogTable {
+       // Schema of the table (column names and types)
+       private final TableSchema tableSchema;
+       // Partition keys if this is a partitioned table. It's an empty set if 
the table is not partitioned
+       private final List<String> partitionKeys;
+       // Properties of the table
+       private final Map<String, String> properties;
+       // Comment of the table
+       private String comment = "This is a hive catalog table.";
+
+       public HiveCatalogTable(
+                       TableSchema tableSchema,
+                       List<String> partitionKeys,
+                       Map<String, String> properties,
+                       String comment) {
+               this.tableSchema = checkNotNull(tableSchema, "tableSchema 
cannot be null");
+               this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys 
cannot be null");
+               this.properties = checkNotNull(properties, "properties cannot 
be null");
+               this.comment = comment;
+       }
+
+       public HiveCatalogTable(
+                       TableSchema tableSchema,
+                       Map<String, String> properties,
+                       String comment) {
+               this(tableSchema, new ArrayList<>(), properties, comment);
+       }
+
+       @Override
+       public boolean isPartitioned() {
+               return !partitionKeys.isEmpty();
+       }
+
+       @Override
+       public List<String> getPartitionKeys() {
+               return partitionKeys;
+       }
+
+       @Override
+       public Map<String, String> getProperties() {
+               return properties;
+       }
+
+       @Override
+       public TableSchema getSchema() {
+               return tableSchema;
+       }
+
+       @Override
+       public String getComment() {
+               return comment;
+       }
+
+       @Override
+       public CatalogBaseTable copy() {
+               return new HiveCatalogTable(
+                       tableSchema.copy(), new ArrayList<>(partitionKeys), new 
HashMap<>(properties), comment);
+       }
+
+       @Override
+       public Optional<String> getDescription() {
+               return Optional.ofNullable(comment);
+       }
+
+       @Override
+       public Optional<String> getDetailedDescription() {
+               // TODO: return a detailed description
+               return Optional.ofNullable(comment);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
index e972ba4..1a64a68 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogUtil.java
@@ -43,6 +43,5 @@ public class HiveCatalogUtil {
                        db.getComment(),
                        hiveCatalogDatabase.getLocation(),
                        hiveCatalogDatabase.getProperties());
-
        }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
index 336d16f..e5063f2 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
@@ -19,11 +19,11 @@
 package org.apache.flink.table.catalog.hive;
 
 /**
- * Configs for Flink tables stored in Hive metastore.
+ * Configs for tables in Hive metastore.
  */
 public class HiveTableConfig {
 
-       // Description of the Flink table
+       // Comment of the Flink table
        public static final String TABLE_COMMENT = "comment";
 
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
index a88f083..7564e40 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/GenericHiveMetastoreCatalogUtil.java
@@ -18,46 +18,15 @@
 
 package org.apache.flink.table.catalog.hive.util;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogView;
-import org.apache.flink.table.catalog.GenericCatalogTable;
-import org.apache.flink.table.catalog.GenericCatalogView;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.hive.HiveCatalogBaseUtil;
-import org.apache.flink.table.catalog.hive.HiveTableConfig;
-import org.apache.flink.table.catalog.hive.HiveTypeUtil;
 
-import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
-       // Prefix used to distinguish properties created by Hive and Flink,
-       // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
-       private static final String FLINK_PROPERTY_PREFIX = "flink.";
-
-       // Flink tables should be stored as 'external' tables in Hive metastore
-       private static final Map<String, String> EXTERNAL_TABLE_PROPERTY = new 
HashMap<String, String>() {{
-               put("EXTERNAL", "TRUE");
-       }};
-
        private GenericHiveMetastoreCatalogUtil() {
        }
 
@@ -77,144 +46,4 @@ public class GenericHiveMetastoreCatalogUtil {
                        null,
                        catalogDatabase.getProperties());
        }
-
-       /**
-        * Creates a Hive table from a CatalogBaseTable.
-        *
-        * @param tablePath path of the table
-        * @param table the CatalogBaseTable instance
-        * @return a Hive table
-        */
-       public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
-               Map<String, String> properties = new 
HashMap<>(table.getProperties());
-
-               // Table comment
-               properties.put(HiveTableConfig.TABLE_COMMENT, 
table.getComment());
-
-               Table hiveTable = new Table();
-               hiveTable.setDbName(tablePath.getDatabaseName());
-               hiveTable.setTableName(tablePath.getObjectName());
-               hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
-
-               // Table properties
-               hiveTable.setParameters(buildFlinkProperties(properties));
-               hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
-
-               // Hive table's StorageDescriptor
-               StorageDescriptor sd = new StorageDescriptor();
-               sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
-
-               List<FieldSchema> allColumns = 
createHiveColumns(table.getSchema());
-
-               // Table columns and partition keys
-               if (table instanceof CatalogTable) {
-                       CatalogTable catalogTable = (CatalogTable) table;
-
-                       if (catalogTable.isPartitioned()) {
-                               int partitionKeySize = 
catalogTable.getPartitionKeys().size();
-                               List<FieldSchema> regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
-                               List<FieldSchema> partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
-
-                               sd.setCols(regularColumns);
-                               hiveTable.setPartitionKeys(partitionColumns);
-                       } else {
-                               sd.setCols(allColumns);
-                               hiveTable.setPartitionKeys(new ArrayList<>());
-                       }
-
-                       hiveTable.setTableType(TableType.EXTERNAL_TABLE.name());
-               } else if (table instanceof CatalogView) {
-                       CatalogView view = (CatalogView) table;
-
-                       // TODO: [FLINK-12398] Support partitioned view in 
catalog API
-                       sd.setCols(allColumns);
-                       hiveTable.setPartitionKeys(new ArrayList<>());
-
-                       hiveTable.setViewOriginalText(view.getOriginalQuery());
-                       hiveTable.setViewExpandedText(view.getExpandedQuery());
-                       hiveTable.setTableType(TableType.VIRTUAL_VIEW.name());
-               } else {
-                       throw new IllegalArgumentException(
-                               "GenericHiveMetastoreCatalog only supports 
CatalogTable and CatalogView");
-               }
-
-               hiveTable.setSd(sd);
-
-               return hiveTable;
-       }
-
-       /**
-        * Creates a CatalogBaseTable from a Hive table.
-        *
-        * @param hiveTable the Hive table
-        * @return a CatalogBaseTable
-        */
-       public static CatalogBaseTable createCatalogTable(Table hiveTable) {
-               // Table schema
-               TableSchema tableSchema = HiveCatalogBaseUtil.createTableSchema(
-                               hiveTable.getSd().getCols(), 
hiveTable.getPartitionKeys());
-
-               // Table properties
-               Map<String, String> properties = 
retrieveFlinkProperties(hiveTable.getParameters());
-
-               // Table comment
-               String comment = 
properties.remove(HiveTableConfig.TABLE_COMMENT);
-
-               // Partition keys
-               List<String> partitionKeys = new ArrayList<>();
-
-               if (!hiveTable.getPartitionKeys().isEmpty()) {
-                       partitionKeys = hiveTable.getPartitionKeys().stream()
-                                                               .map(fs -> 
fs.getName())
-                                                               
.collect(Collectors.toList());
-               }
-
-               if (TableType.valueOf(hiveTable.getTableType()) == 
TableType.VIRTUAL_VIEW) {
-                       return new GenericCatalogView(
-                               hiveTable.getViewOriginalText(),
-                               hiveTable.getViewExpandedText(),
-                               tableSchema,
-                               properties,
-                               comment
-                       );
-               } else {
-                       return new GenericCatalogTable(
-                               tableSchema, partitionKeys, properties, 
comment);
-               }
-       }
-
-       /**
-        * Create Hive columns from Flink TableSchema.
-        */
-       private static List<FieldSchema> createHiveColumns(TableSchema schema) {
-               String[] fieldNames = schema.getFieldNames();
-               TypeInformation[] fieldTypes = schema.getFieldTypes();
-
-               List<FieldSchema> columns = new ArrayList<>(fieldNames.length);
-
-               for (int i = 0; i < fieldNames.length; i++) {
-                       columns.add(
-                               new FieldSchema(fieldNames[i], 
HiveTypeUtil.toHiveType(fieldTypes[i]), null));
-               }
-
-               return columns;
-       }
-
-       /**
-        * Filter out Hive-created properties, and return Flink-created 
properties.
-        */
-       private static Map<String, String> retrieveFlinkProperties(Map<String, 
String> hiveTableParams) {
-               return hiveTableParams.entrySet().stream()
-                       .filter(e -> 
e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
-                       .collect(Collectors.toMap(e -> 
e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
-       }
-
-       /**
-        * Add a prefix to Flink-created properties to distinguish them from 
Hive-created properties.
-        */
-       public static Map<String, String> buildFlinkProperties(Map<String, 
String> properties) {
-               return properties.entrySet().stream()
-                       .filter(e -> e.getKey() != null && e.getValue() != null)
-                       .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + 
e.getKey(), e -> e.getValue()));
-       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
similarity index 72%
rename from 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java
rename to 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index 3f93b4a..c7c68f1 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBaseUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -16,10 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog.hive;
+package org.apache.flink.table.catalog.hive.util;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.hive.HiveTypeUtil;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -28,16 +29,15 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Shared util for catalogs backed by Hive-metastore.
+ * Utils to for Hive-backed table.
  */
-public class HiveCatalogBaseUtil {
+public class HiveTableUtil {
+
+       private HiveTableUtil() {
+       }
 
        /**
         * Create a Flink's TableSchema from Hive table's columns and partition 
keys.
-        *
-        * @param cols columns of the Hive table
-        * @param partitionKeys partition keys of the Hive table
-        * @return a Flink TableSchema
         */
        public static TableSchema createTableSchema(List<FieldSchema> cols, 
List<FieldSchema> partitionKeys) {
                List<FieldSchema> allCols = new ArrayList<>(cols);
@@ -55,4 +55,21 @@ public class HiveCatalogBaseUtil {
 
                return new TableSchema(colNames, colTypes);
        }
+
+       /**
+        * Create Hive columns from Flink TableSchema.
+        */
+       public static List<FieldSchema> createHiveColumns(TableSchema schema) {
+               String[] fieldNames = schema.getFieldNames();
+               TypeInformation[] fieldTypes = schema.getFieldTypes();
+
+               List<FieldSchema> columns = new ArrayList<>(fieldNames.length);
+
+               for (int i = 0; i < fieldNames.length; i++) {
+                       columns.add(
+                               new FieldSchema(fieldNames[i], 
HiveTypeUtil.toHiveType(fieldTypes[i]), null));
+               }
+
+               return columns;
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
index 390ab1d..ca7df2e 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
-import org.apache.flink.table.catalog.CatalogTestUtil;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
 import org.apache.flink.table.catalog.GenericCatalogTable;
@@ -88,7 +87,7 @@ public class GenericHiveMetastoreCatalogTest extends 
CatalogTestBase {
                catalog.createDatabase(db1, createDb(), false);
                catalog.createTable(path1, table, false);
 
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+               checkEquals(table, (CatalogTable) catalog.getTable(path1));
        }
 
        // ------ utils ------
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
index dfd7fcf..2b7e397 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTest.java
@@ -28,82 +28,37 @@ import org.junit.BeforeClass;
 import java.io.IOException;
 import java.util.HashMap;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Test for HiveCatalog.
  */
 public class HiveCatalogTest extends CatalogTestBase {
+
        @BeforeClass
        public static void init() throws IOException {
-               catalog = HiveTestUtils.createGenericHiveMetastoreCatalog();
+               catalog = HiveTestUtils.createHiveCatalog();
                catalog.open();
        }
 
        // =====================
-       // HiveCatalog doesn't support table operation yet
-       // Thus, overriding the following tests which involve table operation 
in CatalogTestBase so they won't run against HiveCatalog
+       // HiveCatalog doesn't support streaming table operation. Ignore this 
test in CatalogTestBase.
        // =====================
 
-       // TODO: re-enable these tests once HiveCatalog support table operations
-       public void testDropDb_DatabaseNotEmptyException() throws Exception {
-       }
-
        public void testCreateTable_Streaming() throws Exception {
        }
 
-       public void testCreateTable_Batch() throws Exception {
-       }
-
-       public void testCreateTable_DatabaseNotExistException() throws 
Exception {
-       }
-
-       public void testCreateTable_TableAlreadyExistException() throws 
Exception {
-       }
-
-       public void testCreateTable_TableAlreadyExist_ignored() throws 
Exception {
-       }
-
-       public void testGetTable_TableNotExistException() throws Exception {
-       }
-
-       public void testGetTable_TableNotExistException_NoDb() throws Exception 
{
-       }
-
-       public void testDropTable_nonPartitionedTable() throws Exception {
-       }
-
-       public void testDropTable_TableNotExistException() throws Exception {
-       }
-
-       public void testDropTable_TableNotExist_ignored() throws Exception {
-       }
-
-       public void testAlterTable() throws Exception {
-       }
-
-       public void testAlterTable_TableNotExistException() throws Exception {
-       }
-
-       public void testAlterTable_TableNotExist_ignored() throws Exception {
-       }
-
-       public void testRenameTable_nonPartitionedTable() throws Exception {
-       }
-
-       public void testRenameTable_TableNotExistException() throws Exception {
-       }
-
-       public void testRenameTable_TableNotExistException_ignored() throws 
Exception {
-       }
+       // =====================
+       // HiveCatalog doesn't support view operation yet
+       // Thus, overriding the following tests which involve table operation 
in CatalogTestBase so they won't run against HiveCatalog
+       // =====================
 
-       public void testRenameTable_TableAlreadyExistException() throws 
Exception {
-       }
+       // TODO: re-enable these tests once HiveCatalog support view operations
 
        public void testListTables() throws Exception {
        }
 
-       public void testTableExists() throws Exception {
-       }
-
        public void testCreateView() throws Exception {
        }
 
@@ -163,32 +118,43 @@ public class HiveCatalogTest extends CatalogTestBase {
 
        @Override
        public CatalogTable createTable() {
-               // TODO: implement this once HiveCatalog support table 
operations
-               return null;
+               return new HiveCatalogTable(
+                       createTableSchema(),
+                       getBatchTableProperties(),
+                       TEST_COMMENT
+               );
        }
 
        @Override
        public CatalogTable createAnotherTable() {
-               // TODO: implement this once HiveCatalog support table 
operations
-               return null;
+               return new HiveCatalogTable(
+                       createAnotherTableSchema(),
+                       getBatchTableProperties(),
+                       TEST_COMMENT
+               );
        }
 
        @Override
        public CatalogTable createStreamingTable() {
-               // TODO: implement this once HiveCatalog support table 
operations
-               return null;
+               throw new UnsupportedOperationException("HiveCatalog doesn't 
support streaming tables.");
        }
 
        @Override
        public CatalogTable createPartitionedTable() {
-               // TODO: implement this once HiveCatalog support table 
operations
-               return null;
+               return new HiveCatalogTable(
+                       createTableSchema(),
+                       createPartitionKeys(),
+                       getBatchTableProperties(),
+                       TEST_COMMENT);
        }
 
        @Override
        public CatalogTable createAnotherPartitionedTable() {
-               // TODO: implement this once HiveCatalog support table 
operations
-               return null;
+               return new HiveCatalogTable(
+                       createAnotherTableSchema(),
+                       createPartitionKeys(),
+                       getBatchTableProperties(),
+                       TEST_COMMENT);
        }
 
        @Override
@@ -202,4 +168,16 @@ public class HiveCatalogTest extends CatalogTestBase {
                // TODO: implement this once HiveCatalog support view operations
                return null;
        }
+
+       @Override
+       public void checkEquals(CatalogTable t1, CatalogTable t2) {
+               assertEquals(t1.getSchema(), t2.getSchema());
+               assertEquals(t1.getComment(), t2.getComment());
+               assertEquals(t1.getPartitionKeys(), t2.getPartitionKeys());
+               assertEquals(t1.isPartitioned(), t2.isPartitioned());
+
+               // Hive tables may have properties created by itself
+               // thus properties of Hive table is a super set of those in its 
corresponding Flink table
+               
assertTrue(t2.getProperties().entrySet().containsAll(t1.getProperties().entrySet()));
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 4a32313..2618c8c 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -40,6 +40,13 @@ public class HiveTestUtils {
                return new 
GenericHiveMetastoreCatalog(CatalogTestBase.TEST_CATALOG_NAME, getHiveConf());
        }
 
+       /**
+        * Create a HiveCatalog with an embedded Hive Metastore.
+        */
+       public static HiveCatalog createHiveCatalog() throws IOException {
+               return new HiveCatalog(CatalogTestBase.TEST_CATALOG_NAME, 
getHiveConf());
+       }
+
        private static HiveConf getHiveConf() throws IOException {
                ClassLoader classLoader = new 
HiveTestUtils().getClass().getClassLoader();
                
HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML));
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index 6fabb78..89cc0ee 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -97,12 +97,12 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
                CatalogPartitionSpec catalogPartitionSpec = 
createPartitionSpec();
                catalog.createPartition(path1, catalogPartitionSpec, 
catalogPartition, false);
 
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+               checkEquals(table, (CatalogTable) catalog.getTable(path1));
                assertTrue(catalog.partitionExists(path1, 
catalogPartitionSpec));
 
                catalog.renameTable(path1, t2, false);
 
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path3));
+               checkEquals(table, (CatalogTable) catalog.getTable(path3));
                assertTrue(catalog.partitionExists(path3, 
catalogPartitionSpec));
                assertFalse(catalog.tableExists(path1));
                assertFalse(catalog.partitionExists(path1, 
catalogPartitionSpec));
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 93c9ff1..ec37104 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -244,7 +244,7 @@ public abstract class CatalogTestBase {
                CatalogTable table = createStreamingTable();
                catalog.createTable(path1, table, false);
 
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+               checkEquals(table, (CatalogTable) catalog.getTable(path1));
        }
 
        @Test
@@ -257,7 +257,7 @@ public abstract class CatalogTestBase {
 
                CatalogBaseTable tableCreated = catalog.getTable(path1);
 
-               CatalogTestUtil.checkEquals(table, (CatalogTable) tableCreated);
+               checkEquals(table, (CatalogTable) tableCreated);
                assertEquals(TEST_COMMENT, tableCreated.getDescription().get());
 
                List<String> tables = catalog.listTables(db1);
@@ -271,7 +271,7 @@ public abstract class CatalogTestBase {
                table = createPartitionedTable();
                catalog.createTable(path1, table, false);
 
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+               checkEquals(table, (CatalogTable) catalog.getTable(path1));
 
                tables = catalog.listTables(db1);
 
@@ -305,11 +305,11 @@ public abstract class CatalogTestBase {
                CatalogTable table = createTable();
                catalog.createTable(path1, table, false);
 
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+               checkEquals(table, (CatalogTable) catalog.getTable(path1));
 
                catalog.createTable(path1, createAnotherTable(), true);
 
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+               checkEquals(table, (CatalogTable) catalog.getTable(path1));
        }
 
        @Test
@@ -361,13 +361,13 @@ public abstract class CatalogTestBase {
                CatalogTable table = createTable();
                catalog.createTable(path1, table, false);
 
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+               checkEquals(table, (CatalogTable) catalog.getTable(path1));
 
                CatalogTable newTable = createAnotherTable();
                catalog.alterTable(path1, newTable, false);
 
                assertNotEquals(table, catalog.getTable(path1));
-               CatalogTestUtil.checkEquals(newTable, (CatalogTable) 
catalog.getTable(path1));
+               checkEquals(newTable, (CatalogTable) catalog.getTable(path1));
 
                catalog.dropTable(path1, false);
 
@@ -375,12 +375,12 @@ public abstract class CatalogTestBase {
                table = createPartitionedTable();
                catalog.createTable(path1, table, false);
 
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+               checkEquals(table, (CatalogTable) catalog.getTable(path1));
 
                newTable = createAnotherPartitionedTable();
                catalog.alterTable(path1, newTable, false);
 
-               CatalogTestUtil.checkEquals(newTable, (CatalogTable) 
catalog.getTable(path1));
+               checkEquals(newTable, (CatalogTable) catalog.getTable(path1));
        }
 
        @Test
@@ -404,11 +404,11 @@ public abstract class CatalogTestBase {
                CatalogTable table = createTable();
                catalog.createTable(path1, table, false);
 
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path1));
+               checkEquals(table, (CatalogTable) catalog.getTable(path1));
 
                catalog.renameTable(path1, t2, false);
 
-               CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(path3));
+               checkEquals(table, (CatalogTable) catalog.getTable(path3));
                assertFalse(catalog.tableExists(path1));
        }
 
@@ -692,4 +692,15 @@ public abstract class CatalogTestBase {
                        put(IS_STREAMING, "true");
                }};
        }
+
+       // ------ equality check utils ------
+       // Can be overriden by sub test class
+
+       protected void checkEquals(CatalogTable t1, CatalogTable t2) {
+               assertEquals(t1.getSchema(), t2.getSchema());
+               assertEquals(t1.getProperties(), t2.getProperties());
+               assertEquals(t1.getComment(), t2.getComment());
+               assertEquals(t1.getPartitionKeys(), t2.getPartitionKeys());
+               assertEquals(t1.isPartitioned(), t2.isPartitioned());
+       }
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index b131698..412c73c 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataString;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.catalog.stats.Date;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import java.util.Map;
 
@@ -36,16 +37,12 @@ import static org.junit.Assert.assertTrue;
 
 /**
  * Utility class for catalog testing.
+ * TODO: Move util methods to CatalogTestBase and remove this class
  */
 public class CatalogTestUtil {
-
-       public static void checkEquals(CatalogTable t1, CatalogTable t2) {
-               assertEquals(t1.getSchema(), t2.getSchema());
-               assertEquals(t1.getComment(), t2.getComment());
-               assertEquals(t1.getProperties(), t2.getProperties());
-               assertEquals(t1.getPartitionKeys(), t2.getPartitionKeys());
-               assertEquals(t1.isPartitioned(), t2.isPartitioned());
-               assertEquals(t1.getDescription(), t2.getDescription());
+       public static void checkEquals(TableStats ts1, TableStats ts2) {
+               assertEquals(ts1.getRowCount(), ts2.getRowCount());
+               assertEquals(ts1.getColumnStats().size(), 
ts2.getColumnStats().size());
        }
 
        public static void checkEquals(CatalogView v1, CatalogView v2) {

Reply via email to