This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 95e1686  [FLINK-12235][hive] Support partition related operations in 
HiveCatalog
95e1686 is described below

commit 95e16862822922972f70a710bc09b7ec31c9043b
Author: Rui Li <[email protected]>
AuthorDate: Wed May 15 19:44:06 2019 +0800

    [FLINK-12235][hive] Support partition related operations in HiveCatalog
    
    This PR adds support for Hive partitions in HiveCatalog.
    
    This closes #8449.
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 270 ++++++++++++++-
 .../table/catalog/hive/HiveCatalogPartition.java   |  61 ++++
 .../hive/HiveCatalogGenericMetadataTest.java       | 108 ++++++
 .../catalog/hive/HiveCatalogHiveMetadataTest.java  |  18 +
 .../table/catalog/GenericInMemoryCatalog.java      |  82 ++---
 .../table/catalog/GenericInMemoryCatalogTest.java  | 371 +-------------------
 .../org/apache/flink/table/catalog/Catalog.java    |   2 +-
 .../flink/table/catalog/CatalogTestBase.java       | 379 +++++++++++++++++++++
 .../flink/table/catalog/CatalogTestUtil.java       |   4 -
 9 files changed, 859 insertions(+), 436 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 1f0bddc..d2387f0 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.GenericCatalogDatabase;
 import org.apache.flink.table.catalog.GenericCatalogFunction;
+import org.apache.flink.table.catalog.GenericCatalogPartition;
 import org.apache.flink.table.catalog.GenericCatalogTable;
 import org.apache.flink.table.catalog.GenericCatalogView;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
@@ -53,6 +54,7 @@ import org.apache.flink.util.StringUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
@@ -63,6 +65,7 @@ import org.apache.hadoop.hive.metastore.api.FunctionType;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -500,7 +503,7 @@ public class HiveCatalog implements Catalog {
                // Partition keys
                List<String> partitionKeys = new ArrayList<>();
                if (!hiveTable.getPartitionKeys().isEmpty()) {
-                       partitionKeys = 
hiveTable.getPartitionKeys().stream().map(fs -> 
fs.getName()).collect(Collectors.toList());
+                       partitionKeys = 
getFieldNames(hiveTable.getPartitionKeys());
                }
 
                if (isView) {
@@ -608,44 +611,285 @@ public class HiveCatalog implements Catalog {
        // ------ partitions ------
 
        @Override
+       public boolean partitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+                       throws CatalogException {
+               checkNotNull(tablePath, "Table path cannot be null");
+               checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
+
+               try {
+                       return getHivePartition(tablePath, partitionSpec) != 
null;
+               } catch (NoSuchObjectException | TableNotExistException | 
PartitionSpecInvalidException e) {
+                       return false;
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to get partition %s of 
table %s", partitionSpec, tablePath), e);
+               }
+       }
+
+       @Override
        public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
                        throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, 
PartitionAlreadyExistsException, CatalogException {
-               throw new UnsupportedOperationException();
+               checkNotNull(tablePath, "Table path cannot be null");
+               checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
+               checkNotNull(partition, "Partition cannot be null");
+
+               if (!(partition instanceof HiveCatalogPartition)) {
+                       throw new CatalogException("Currently only supports 
HiveCatalogPartition");
+               }
+
+               Table hiveTable = getHiveTable(tablePath);
+
+               ensureTableAndPartitionMatch(hiveTable, partition);
+
+               ensurePartitionedTable(tablePath, hiveTable);
+
+               try {
+                       
client.add_partition(instantiateHivePartition(hiveTable, partitionSpec, 
partition));
+               } catch (AlreadyExistsException e) {
+                       if (!ignoreIfExists) {
+                               throw new 
PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
+                       }
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to create partition %s of 
table %s", partitionSpec, tablePath));
+               }
        }
 
        @Override
        public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, boolean ignoreIfNotExists)
                        throws PartitionNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
-       }
+               checkNotNull(tablePath, "Table path cannot be null");
+               checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
 
-       @Override
-       public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
-                       throws PartitionNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
+               try {
+                       Table hiveTable = getHiveTable(tablePath);
+                       client.dropPartition(tablePath.getDatabaseName(), 
tablePath.getObjectName(),
+                               getOrderedFullPartitionValues(partitionSpec, 
getFieldNames(hiveTable.getPartitionKeys()), tablePath), true);
+               } catch (NoSuchObjectException e) {
+                       if (!ignoreIfNotExists) {
+                               throw new 
PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+                       }
+               } catch (MetaException | TableNotExistException | 
PartitionSpecInvalidException e) {
+                       throw new PartitionNotExistException(catalogName, 
tablePath, partitionSpec, e);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to drop partition %s of 
table %s", partitionSpec, tablePath));
+               }
        }
 
        @Override
        public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
                        throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
-               throw new UnsupportedOperationException();
+               checkNotNull(tablePath, "Table path cannot be null");
+
+               Table hiveTable = getHiveTable(tablePath);
+
+               ensurePartitionedTable(tablePath, hiveTable);
+
+               try {
+                       // pass -1 as max_parts to fetch all partitions
+                       return 
client.listPartitionNames(tablePath.getDatabaseName(), 
tablePath.getObjectName(), (short) -1).stream()
+                               
.map(HiveCatalog::createPartitionSpec).collect(Collectors.toList());
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to list partitions of 
table %s", tablePath), e);
+               }
        }
 
        @Override
        public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
                        throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
-               throw new UnsupportedOperationException();
+               checkNotNull(tablePath, "Table path cannot be null");
+               checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
+
+               Table hiveTable = getHiveTable(tablePath);
+
+               ensurePartitionedTable(tablePath, hiveTable);
+
+               try {
+                       // partition spec can be partial
+                       List<String> partialVals = 
MetaStoreUtils.getPvals(hiveTable.getPartitionKeys(), 
partitionSpec.getPartitionSpec());
+                       return 
client.listPartitionNames(tablePath.getDatabaseName(), 
tablePath.getObjectName(), partialVals,
+                               (short) 
-1).stream().map(HiveCatalog::createPartitionSpec).collect(Collectors.toList());
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to list partitions of 
table %s", tablePath), e);
+               }
        }
 
        @Override
        public CatalogPartition getPartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
                        throws PartitionNotExistException, CatalogException {
-               throw new UnsupportedOperationException();
+               checkNotNull(tablePath, "Table path cannot be null");
+               checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
+
+               try {
+                       Partition hivePartition = getHivePartition(tablePath, 
partitionSpec);
+                       return instantiateCatalogPartition(hivePartition);
+               } catch (NoSuchObjectException | MetaException | 
TableNotExistException | PartitionSpecInvalidException e) {
+                       throw new PartitionNotExistException(catalogName, 
tablePath, partitionSpec, e);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to get partition %s of 
table %s", partitionSpec, tablePath), e);
+               }
        }
 
        @Override
-       public boolean partitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec) throws CatalogException {
-               throw new UnsupportedOperationException();
+       public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
+                       throws PartitionNotExistException, CatalogException {
+               checkNotNull(tablePath, "Table path cannot be null");
+               checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be 
null");
+               checkNotNull(newPartition, "New partition cannot be null");
+
+               if (!(newPartition instanceof HiveCatalogPartition)) {
+                       throw new CatalogException("Currently only supports 
HiveCatalogPartition");
+               }
+
+               // Explicitly check if the partition exists or not
+               // because alter_partition() doesn't throw 
NoSuchObjectException like dropPartition() when the target doesn't exist
+               try {
+                       Table hiveTable = getHiveTable(tablePath);
+                       ensureTableAndPartitionMatch(hiveTable, newPartition);
+                       Partition oldHivePartition = 
getHivePartition(hiveTable, partitionSpec);
+                       if (oldHivePartition == null) {
+                               if (ignoreIfNotExists) {
+                                       return;
+                               }
+                               throw new 
PartitionNotExistException(catalogName, tablePath, partitionSpec);
+                       }
+                       Partition newHivePartition = 
instantiateHivePartition(hiveTable, partitionSpec, newPartition);
+                       if (newHivePartition.getSd().getLocation() == null) {
+                               
newHivePartition.getSd().setLocation(oldHivePartition.getSd().getLocation());
+                       }
+                       client.alter_partition(
+                               tablePath.getDatabaseName(),
+                               tablePath.getObjectName(),
+                               newHivePartition
+                       );
+               } catch (NoSuchObjectException e) {
+                       if (!ignoreIfNotExists) {
+                               throw new 
PartitionNotExistException(catalogName, tablePath, partitionSpec, e);
+                       }
+               } catch (InvalidOperationException | MetaException | 
TableNotExistException | PartitionSpecInvalidException e) {
+                       throw new PartitionNotExistException(catalogName, 
tablePath, partitionSpec, e);
+               } catch (TException e) {
+                       throw new CatalogException(
+                               String.format("Failed to alter existing 
partition with new partition %s of table %s",
+                                       partitionSpec, tablePath), e);
+               }
+       }
+
+       // make sure both table and partition are generic, or neither is
+       private static void ensureTableAndPartitionMatch(Table hiveTable, 
CatalogPartition catalogPartition) {
+               boolean isGeneric = 
Boolean.valueOf(hiveTable.getParameters().get(FLINK_PROPERTY_IS_GENERIC));
+               if ((isGeneric && catalogPartition instanceof 
HiveCatalogPartition) ||
+                       (!isGeneric && catalogPartition instanceof 
GenericCatalogPartition)) {
+                       throw new CatalogException(String.format("Cannot handle 
%s partition for %s table",
+                               catalogPartition.getClass().getName(), 
isGeneric ? "generic" : "non-generic"));
+               }
+       }
+
+       private Partition instantiateHivePartition(Table hiveTable, 
CatalogPartitionSpec partitionSpec, CatalogPartition catalogPartition)
+                       throws PartitionSpecInvalidException {
+               Partition partition = new Partition();
+               List<String> partCols = 
getFieldNames(hiveTable.getPartitionKeys());
+               List<String> partValues = getOrderedFullPartitionValues(
+                       partitionSpec, partCols, new 
ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()));
+               // validate partition values
+               for (int i = 0; i < partCols.size(); i++) {
+                       if 
(StringUtils.isNullOrWhitespaceOnly(partValues.get(i))) {
+                               throw new 
PartitionSpecInvalidException(catalogName, partCols,
+                                       new ObjectPath(hiveTable.getDbName(), 
hiveTable.getTableName()), partitionSpec);
+                       }
+               }
+               // TODO: handle GenericCatalogPartition
+               HiveCatalogPartition hiveCatalogPartition = 
(HiveCatalogPartition) catalogPartition;
+               partition.setValues(partValues);
+               partition.setDbName(hiveTable.getDbName());
+               partition.setTableName(hiveTable.getTableName());
+               partition.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+               partition.setParameters(hiveCatalogPartition.getProperties());
+               partition.setSd(hiveTable.getSd().deepCopy());
+               
partition.getSd().setLocation(hiveCatalogPartition.getLocation());
+
+               return partition;
+       }
+
+       private static CatalogPartition instantiateCatalogPartition(Partition 
hivePartition) {
+               // TODO: create GenericCatalogPartition for GenericCatalogTable
+               return new HiveCatalogPartition(hivePartition.getParameters(), 
hivePartition.getSd().getLocation());
+       }
+
+       private void ensurePartitionedTable(ObjectPath tablePath, Table 
hiveTable) throws TableNotPartitionedException {
+               if (hiveTable.getPartitionKeysSize() == 0) {
+                       throw new TableNotPartitionedException(catalogName, 
tablePath);
+               }
+       }
+
+       /**
+        * Get field names from field schemas.
+        */
+       private static List<String> getFieldNames(List<FieldSchema> 
fieldSchemas) {
+               List<String> names = new ArrayList<>(fieldSchemas.size());
+               for (FieldSchema fs : fieldSchemas) {
+                       names.add(fs.getName());
+               }
+               return names;
+       }
+
+       /**
+        * Creates a {@link CatalogPartitionSpec} from a Hive partition name 
string.
+        * Example of Hive partition name string - "name=bob/year=2019"
+        */
+       private static CatalogPartitionSpec createPartitionSpec(String 
hivePartitionName) {
+               String[] partKeyVals = hivePartitionName.split("/");
+               Map<String, String> spec = new HashMap<>(partKeyVals.length);
+               for (String keyVal : partKeyVals) {
+                       String[] kv = keyVal.split("=");
+                       spec.put(kv[0], kv[1]);
+               }
+               return new CatalogPartitionSpec(spec);
+       }
+
+       /**
+        * Get a list of ordered partition values by re-arranging them based on 
the given list of partition keys.
+        *
+        * @param partitionSpec a partition spec.
+        * @param partitionKeys a list of partition keys.
+        * @param tablePath path of the table to which the partition belongs.
+        * @return A list of partition values ordered according to 
partitionKeys.
+        * @throws PartitionSpecInvalidException thrown if partitionSpec and 
partitionKeys have different sizes,
+        *                                       or any key in partitionKeys 
doesn't exist in partitionSpec.
+        */
+       private List<String> getOrderedFullPartitionValues(CatalogPartitionSpec 
partitionSpec, List<String> partitionKeys, ObjectPath tablePath)
+                       throws PartitionSpecInvalidException {
+               Map<String, String> spec = partitionSpec.getPartitionSpec();
+               if (spec.size() != partitionKeys.size()) {
+                       throw new PartitionSpecInvalidException(catalogName, 
partitionKeys, tablePath, partitionSpec);
+               }
+
+               List<String> values = new ArrayList<>(spec.size());
+               for (String key : partitionKeys) {
+                       if (!spec.containsKey(key)) {
+                               throw new 
PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, 
partitionSpec);
+                       } else {
+                               values.add(spec.get(key));
+                       }
+               }
+
+               return values;
+       }
+
+       private Partition getHivePartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+                       throws TableNotExistException, 
PartitionSpecInvalidException, TException {
+               return getHivePartition(getHiveTable(tablePath), partitionSpec);
+       }
+
+       private Partition getHivePartition(Table hiveTable, 
CatalogPartitionSpec partitionSpec)
+                       throws PartitionSpecInvalidException, TException {
+               return client.getPartition(hiveTable.getDbName(), 
hiveTable.getTableName(),
+                       getOrderedFullPartitionValues(partitionSpec, 
getFieldNames(hiveTable.getPartitionKeys()),
+                               new ObjectPath(hiveTable.getDbName(), 
hiveTable.getTableName())));
        }
 
        // ------ functions ------
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java
new file mode 100644
index 0000000..98b13a2
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogPartition.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.catalog.AbstractCatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartition;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A CatalogPartition implementation that represents a Partition in Hive.
+ */
+public class HiveCatalogPartition extends AbstractCatalogPartition {
+       private final String location;
+
+       public HiveCatalogPartition(Map<String, String> properties, String 
location) {
+               super(properties, null);
+               this.location = location;
+       }
+
+       public HiveCatalogPartition(Map<String, String> properties) {
+               this(properties, null);
+       }
+
+       public String getLocation() {
+               return location;
+       }
+
+       @Override
+       public CatalogPartition copy() {
+               return new HiveCatalogPartition(new HashMap<>(getProperties()), 
location);
+       }
+
+       @Override
+       public Optional<String> getDescription() {
+               return Optional.empty();
+       }
+
+       @Override
+       public Optional<String> getDetailedDescription() {
+               return Optional.empty();
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
index 9a35068..90ec11e 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogGenericMetadataTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
 import org.apache.flink.table.catalog.CatalogView;
@@ -92,6 +93,108 @@ public class HiveCatalogGenericMetadataTest extends 
CatalogTestBase {
                checkEquals(table, (CatalogTable) catalog.getTable(path1));
        }
 
+       // ------ partitions ------
+
+       @Test
+       public void testCreatePartition() throws Exception {
+       }
+
+       @Test
+       public void testCreatePartition_TableNotExistException() throws 
Exception {
+       }
+
+       @Test
+       public void testCreatePartition_TableNotPartitionedException() throws 
Exception {
+       }
+
+       @Test
+       public void testCreatePartition_PartitionSpecInvalidException() throws 
Exception {
+       }
+
+       @Test
+       public void testCreatePartition_PartitionAlreadyExistsException() 
throws Exception {
+       }
+
+       @Test
+       public void testCreatePartition_PartitionAlreadyExists_ignored() throws 
Exception {
+       }
+
+       @Test
+       public void testDropPartition() throws Exception {
+       }
+
+       @Test
+       public void testDropPartition_TableNotExist() throws Exception {
+       }
+
+       @Test
+       public void testDropPartition_TableNotPartitioned() throws Exception {
+       }
+
+       @Test
+       public void testDropPartition_PartitionSpecInvalid() throws Exception {
+       }
+
+       @Test
+       public void testDropPartition_PartitionNotExist() throws Exception {
+       }
+
+       @Test
+       public void testDropPartition_PartitionNotExist_ignored() throws 
Exception {
+       }
+
+       @Test
+       public void testAlterPartition() throws Exception {
+       }
+
+       @Test
+       public void testAlterPartition_TableNotExist() throws Exception {
+       }
+
+       @Test
+       public void testAlterPartition_TableNotPartitioned() throws Exception {
+       }
+
+       @Test
+       public void testAlterPartition_PartitionSpecInvalid() throws Exception {
+       }
+
+       @Test
+       public void testAlterPartition_PartitionNotExist() throws Exception {
+       }
+
+       @Test
+       public void testAlterPartition_PartitionNotExist_ignored() throws 
Exception {
+       }
+
+       @Test
+       public void testGetPartition_TableNotExist() throws Exception {
+       }
+
+       @Test
+       public void testGetPartition_TableNotPartitioned() throws Exception {
+       }
+
+       @Test
+       public void 
testGetPartition_PartitionSpecInvalid_invalidPartitionSpec() throws Exception {
+       }
+
+       @Test
+       public void testGetPartition_PartitionSpecInvalid_sizeNotEqual() throws 
Exception {
+       }
+
+       @Test
+       public void testGetPartition_PartitionNotExist() throws Exception {
+       }
+
+       @Test
+       public void testPartitionExists() throws Exception {
+       }
+
+       @Test
+       public void testListPartitionPartialSpec() throws Exception {
+       }
+
        // ------ test utils ------
 
        @Override
@@ -183,4 +286,9 @@ public class HiveCatalogGenericMetadataTest extends 
CatalogTestBase {
        protected CatalogFunction createAnotherFunction() {
                return new 
GenericCatalogFunction(MyOtherScalarFunction.class.getName());
        }
+
+       @Override
+       public CatalogPartition createPartition() {
+               throw new UnsupportedOperationException();
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 1126564..e2d95a3 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
 import org.apache.flink.table.catalog.CatalogView;
@@ -144,6 +145,11 @@ public class HiveCatalogHiveMetadataTest extends 
CatalogTestBase {
        }
 
        @Override
+       public CatalogPartition createPartition() {
+               return new HiveCatalogPartition(getBatchTableProperties());
+       }
+
+       @Override
        public void checkEquals(CatalogTable t1, CatalogTable t2) {
                assertEquals(t1.getSchema(), t2.getSchema());
                assertEquals(t1.getComment(), t2.getComment());
@@ -155,6 +161,7 @@ public class HiveCatalogHiveMetadataTest extends 
CatalogTestBase {
                
assertTrue(t2.getProperties().entrySet().containsAll(t1.getProperties().entrySet()));
        }
 
+       @Override
        protected void checkEquals(CatalogView v1, CatalogView v2) {
                assertEquals(v1.getSchema(), v1.getSchema());
                assertEquals(v1.getComment(), v2.getComment());
@@ -165,4 +172,15 @@ public class HiveCatalogHiveMetadataTest extends 
CatalogTestBase {
                // thus properties of Hive view is a super set of those in its 
corresponding Flink view
                
assertTrue(v2.getProperties().entrySet().containsAll(v1.getProperties().entrySet()));
        }
+
+       @Override
+       protected void checkEquals(CatalogPartition expected, CatalogPartition 
actual) {
+               assertTrue(expected instanceof HiveCatalogPartition && actual 
instanceof HiveCatalogPartition);
+               assertEquals(expected.getClass(), actual.getClass());
+               HiveCatalogPartition hivePartition1 = (HiveCatalogPartition) 
expected;
+               HiveCatalogPartition hivePartition2 = (HiveCatalogPartition) 
actual;
+               assertEquals(hivePartition1.getDescription(), 
hivePartition2.getDescription());
+               assertEquals(hivePartition1.getDetailedDescription(), 
hivePartition2.getDetailedDescription());
+               
assertTrue(hivePartition2.getProperties().entrySet().containsAll(hivePartition1.getProperties().entrySet()));
+       }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index c90e1722..d46a423 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -344,6 +344,12 @@ public class GenericInMemoryCatalog implements Catalog {
                return databaseExists(tablePath.getDatabaseName()) && 
tables.containsKey(tablePath);
        }
 
+       private void ensureTableExists(ObjectPath tablePath) throws 
TableNotExistException {
+               if (!tableExists(tablePath)) {
+                       throw new TableNotExistException(catalogName, 
tablePath);
+               }
+       }
+
        // ------ functions ------
 
        @Override
@@ -437,13 +443,9 @@ public class GenericInMemoryCatalog implements Catalog {
                checkNotNull(partitionSpec);
                checkNotNull(partition);
 
-               if (!tableExists(tablePath)) {
-                       throw new TableNotExistException(catalogName, 
tablePath);
-               }
-
-               if (!isPartitionedTable(tablePath)) {
-                       throw new TableNotPartitionedException(catalogName, 
tablePath);
-               }
+               ensureTableExists(tablePath);
+               ensurePartitionedTable(tablePath);
+               ensureFullPartitionSpec(tablePath, partitionSpec);
 
                if (partitionExists(tablePath, partitionSpec)) {
                        if (!ignoreIfExists) {
@@ -451,11 +453,6 @@ public class GenericInMemoryCatalog implements Catalog {
                        }
                }
 
-               if (!isPartitionSpecValid(tablePath, partitionSpec)) {
-                       throw new PartitionSpecInvalidException(catalogName, 
((CatalogTable) getTable(tablePath)).getPartitionKeys(),
-                               tablePath, partitionSpec);
-               }
-
                partitions.get(tablePath).put(partitionSpec, partition.copy());
        }
 
@@ -502,13 +499,8 @@ public class GenericInMemoryCatalog implements Catalog {
                        throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
                checkNotNull(tablePath);
 
-               if (!tableExists(tablePath)) {
-                       throw new TableNotExistException(catalogName, 
tablePath);
-               }
-
-               if (!isPartitionedTable(tablePath)) {
-                       throw new TableNotPartitionedException(catalogName, 
tablePath);
-               }
+               ensureTableExists(tablePath);
+               ensurePartitionedTable(tablePath);
 
                return new ArrayList<>(partitions.get(tablePath).keySet());
        }
@@ -519,15 +511,12 @@ public class GenericInMemoryCatalog implements Catalog {
                checkNotNull(tablePath);
                checkNotNull(partitionSpec);
 
-               if (!tableExists(tablePath)) {
-                       throw new TableNotExistException(catalogName, 
tablePath);
-               }
-
-               if (!isPartitionedTable(tablePath)) {
-                       throw new TableNotPartitionedException(catalogName, 
tablePath);
-               }
+               ensurePartitionedTable(tablePath);
 
-               if (!isPartitionSpecValid(tablePath, partitionSpec)) {
+               CatalogTable catalogTable = (CatalogTable) getTable(tablePath);
+               List<String> partKeys = catalogTable.getPartitionKeys();
+               Map<String, String> spec = partitionSpec.getPartitionSpec();
+               if (!partKeys.containsAll(spec.keySet())) {
                        return new ArrayList<>();
                }
 
@@ -558,41 +547,36 @@ public class GenericInMemoryCatalog implements Catalog {
                return partitions.containsKey(tablePath) && 
partitions.get(tablePath).containsKey(partitionSpec);
        }
 
+       private void ensureFullPartitionSpec(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+               throws TableNotExistException, PartitionSpecInvalidException {
+               if (!isFullPartitionSpec(tablePath, partitionSpec)) {
+                       throw new PartitionSpecInvalidException(catalogName, 
((CatalogTable) getTable(tablePath)).getPartitionKeys(),
+                               tablePath, partitionSpec);
+               }
+       }
+
        /**
-        * Check if the given partitionSpec is valid for the given table.
-        * Note that partition spec is considered invalid if the table doesn't 
exist or isn't partitioned.
+        * Check if the given partitionSpec is full partition spec for the 
given table.
         */
-       private boolean isPartitionSpecValid(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec) {
-               CatalogBaseTable baseTable;
-               try {
-                       baseTable = getTable(tablePath);
-               } catch (TableNotExistException e) {
-                       return false;
-               }
+       private boolean isFullPartitionSpec(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec) throws TableNotExistException {
+               CatalogBaseTable baseTable = getTable(tablePath);
 
                if (!(baseTable instanceof CatalogTable)) {
                        return false;
                }
 
-               CatalogTable table =  (CatalogTable) baseTable;
+               CatalogTable table = (CatalogTable) baseTable;
                List<String> partitionKeys = table.getPartitionKeys();
                Map<String, String> spec = partitionSpec.getPartitionSpec();
 
                // The size of partition spec should not exceed the size of 
partition keys
-               if (partitionKeys.size() < spec.size()) {
-                       return false;
-               } else {
-                       int size = spec.size();
+               return partitionKeys.size() == spec.size() && 
spec.keySet().containsAll(partitionKeys);
+       }
 
-                       // PartitionSpec should contain the first 'size' number 
of keys in partition key list
-                       for (int i = 0; i < size; i++) {
-                               if (!spec.containsKey(partitionKeys.get(i))) {
-                                       return false;
-                               }
-                       }
+       private void ensurePartitionedTable(ObjectPath tablePath) throws 
TableNotPartitionedException {
+               if (!isPartitionedTable(tablePath)) {
+                       throw new TableNotPartitionedException(catalogName, 
tablePath);
                }
-
-               return true;
        }
 
        /**
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index d332456..89eb043 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -19,11 +19,6 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
-import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
@@ -39,13 +34,10 @@ import org.apache.flink.table.functions.ScalarFunction;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -100,194 +92,6 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
        // ------ partitions ------
 
        @Test
-       public void testCreatePartition() throws Exception {
-               CatalogTable table = createPartitionedTable();
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, table, false);
-
-               assertTrue(catalog.listPartitions(path1).isEmpty());
-
-               CatalogPartitionSpec partitionSpec = createPartitionSpec();
-               catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
-
-               assertEquals(Arrays.asList(partitionSpec), 
catalog.listPartitions(path1));
-               assertEquals(Arrays.asList(partitionSpec), 
catalog.listPartitions(path1, createPartitionSpecSubset()));
-               CatalogTestUtil.checkEquals(createPartition(), 
catalog.getPartition(path1, createPartitionSpec()));
-
-               CatalogPartitionSpec anotherPartitionSpec = 
createAnotherPartitionSpec();
-               CatalogPartition anotherPartition = createAnotherPartition();
-               catalog.createPartition(path1, anotherPartitionSpec, 
anotherPartition, false);
-
-               assertEquals(Arrays.asList(partitionSpec, 
anotherPartitionSpec), catalog.listPartitions(path1));
-               assertEquals(Arrays.asList(partitionSpec, 
anotherPartitionSpec), catalog.listPartitions(path1, 
createPartitionSpecSubset()));
-               CatalogTestUtil.checkEquals(anotherPartition, 
catalog.getPartition(path1, anotherPartitionSpec));
-
-               CatalogPartitionSpec invalid = 
createInvalidPartitionSpecSubset();
-               assertTrue(catalog.listPartitions(path1, invalid).isEmpty());
-       }
-
-       @Test
-       public void testCreatePartition_TableNotExistException() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-
-               exception.expect(TableNotExistException.class);
-               exception.expectMessage(
-                       String.format("Table (or view) %s does not exist in 
Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
-               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
-       }
-
-       @Test
-       public void testCreatePartition_TableNotPartitionedException() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createTable(), false);
-
-               exception.expect(TableNotPartitionedException.class);
-               exception.expectMessage(
-                       String.format("Table %s in catalog %s is not 
partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
-               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
-       }
-
-       @Test
-       public void testCreatePartition_PartitionSpecInvalidException() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               CatalogTable table = createPartitionedTable();
-               catalog.createTable(path1, table, false);
-
-               CatalogPartitionSpec partitionSpec = 
createInvalidPartitionSpecSubset();
-               exception.expect(PartitionSpecInvalidException.class);
-               exception.expectMessage(
-                       String.format("PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s.",
-                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), TEST_CATALOG_NAME));
-               catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
-       }
-
-       @Test
-       public void testCreatePartition_PartitionAlreadyExistsException() 
throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createPartitionedTable(), false);
-               CatalogPartition partition = createPartition();
-               catalog.createPartition(path1, createPartitionSpec(), 
partition, false);
-
-               CatalogPartitionSpec partitionSpec = createPartitionSpec();
-
-               exception.expect(PartitionAlreadyExistsException.class);
-               exception.expectMessage(
-                       String.format("Partition %s of table %s in catalog %s 
already exists.",
-                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
-               catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
-       }
-
-       @Test
-       public void testCreatePartition_PartitionAlreadyExists_ignored() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createPartitionedTable(), false);
-
-               CatalogPartitionSpec partitionSpec = createPartitionSpec();
-               catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
-               catalog.createPartition(path1, partitionSpec, 
createPartition(), true);
-       }
-
-       @Test
-       public void testDropPartition() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createPartitionedTable(), false);
-               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
-
-               assertEquals(Arrays.asList(createPartitionSpec()), 
catalog.listPartitions(path1));
-
-               catalog.dropPartition(path1, createPartitionSpec(), false);
-
-               assertEquals(Arrays.asList(), catalog.listPartitions(path1));
-       }
-
-       @Test
-       public void 
testDropPartition_PartitionNotExistException_TableNotExist() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               CatalogPartitionSpec partitionSpec = createPartitionSpec();
-
-               exception.expect(PartitionNotExistException.class);
-               exception.expectMessage(
-                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
-                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
-               catalog.dropPartition(path1, partitionSpec, false);
-       }
-
-       @Test
-       public void 
testDropPartition_PartitionNotExistException_TableNotPartitioned() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createTable(), false);
-               CatalogPartitionSpec partitionSpec = createPartitionSpec();
-
-               exception.expect(PartitionNotExistException.class);
-               exception.expectMessage(
-                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
-                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
-               catalog.dropPartition(path1, partitionSpec, false);
-       }
-
-       @Test
-       public void 
testDropPartition_PartitionNotExistException_PartitionSpecInvalid() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               CatalogTable table = createPartitionedTable();
-               catalog.createTable(path1, table, false);
-
-               CatalogPartitionSpec partitionSpec = 
createInvalidPartitionSpecSubset();
-               exception.expect(PartitionNotExistException.class);
-               exception.expectMessage(
-                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
-                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
-               catalog.dropPartition(path1, partitionSpec, false);
-       }
-
-       @Test
-       public void testDropPartition_PartitionNotExistException() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createPartitionedTable(), false);
-
-               CatalogPartitionSpec partitionSpec = createPartitionSpec();
-               exception.expect(PartitionNotExistException.class);
-               exception.expectMessage(
-                       String.format("Partition %s of table %s in catalog %s 
does not exist.", partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
-               catalog.dropPartition(path1, partitionSpec, false);
-       }
-
-       @Test
-       public void testDropPartition_PartitionNotExist_ignored() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createPartitionedTable(), false);
-               catalog.dropPartition(path1, createPartitionSpec(), true);
-       }
-
-       @Test
-       public void testAlterPartition() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createPartitionedTable(), false);
-
-               CatalogPartitionSpec partitionSpec = createPartitionSpec();
-               catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
-
-               assertEquals(Arrays.asList(partitionSpec), 
catalog.listPartitions(path1));
-
-               CatalogPartition cp = catalog.getPartition(path1, 
createPartitionSpec());
-               CatalogTestUtil.checkEquals(createPartition(), cp);
-
-               assertNull(cp.getProperties().get("k"));
-
-               Map<String, String> partitionProperties = 
getBatchTableProperties();
-               partitionProperties.put("k", "v");
-
-               CatalogPartition another = createPartition(partitionProperties);
-               catalog.alterPartition(path1, createPartitionSpec(), another, 
false);
-
-               assertEquals(Arrays.asList(createPartitionSpec()), 
catalog.listPartitions(path1));
-
-               cp = catalog.getPartition(path1, createPartitionSpec());
-               CatalogTestUtil.checkEquals(another, cp);
-
-               assertEquals("v", cp.getProperties().get("k"));
-       }
-
-       @Test
        public void testAlterPartition_differentTypedPartition() throws 
Exception {
                catalog.createDatabase(db1, createDb(), false);
                catalog.createTable(path1, createPartitionedTable(), false);
@@ -305,140 +109,6 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
                catalog.alterPartition(path1, partitionSpec, new 
TestPartition(), false);
        }
 
-       @Test
-       public void 
testAlterPartition_PartitionNotExistException_TableNotExist() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-
-               CatalogPartitionSpec partitionSpec = createPartitionSpec();
-               exception.expect(PartitionNotExistException.class);
-               exception.expectMessage(
-                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
-                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
-               catalog.alterPartition(path1, partitionSpec, createPartition(), 
false);
-       }
-
-       @Test
-       public void 
testAlterPartition_PartitionNotExistException_TableNotPartitioned() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createTable(), false);
-
-               CatalogPartitionSpec partitionSpec = createPartitionSpec();
-               exception.expect(PartitionNotExistException.class);
-               exception.expectMessage(
-                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
-                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
-               catalog.alterPartition(path1, partitionSpec, createPartition(), 
false);
-       }
-
-       @Test
-       public void 
testAlterPartition_PartitionNotExistException_PartitionSpecInvalid() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               CatalogTable table = createPartitionedTable();
-               catalog.createTable(path1, table, false);
-
-               CatalogPartitionSpec partitionSpec = 
createInvalidPartitionSpecSubset();
-               exception.expect(PartitionNotExistException.class);
-               exception.expectMessage(
-                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
-                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
-               catalog.alterPartition(path1, partitionSpec, createPartition(), 
false);
-       }
-
-       @Test
-       public void testAlterPartition_PartitionNotExistException() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createPartitionedTable(), false);
-
-               CatalogPartition catalogPartition = createPartition();
-               CatalogPartitionSpec partitionSpec = createPartitionSpec();
-               exception.expect(PartitionNotExistException.class);
-               exception.expectMessage(
-                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
-                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
-               catalog.alterPartition(path1, partitionSpec, catalogPartition, 
false);
-       }
-
-       @Test
-       public void testAlterPartition_PartitionNotExist_ignored() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createPartitionedTable(), false);
-               catalog.alterPartition(path1, createPartitionSpec(), 
createPartition(), true);
-       }
-
-       @Test
-       public void testGetPartition_PartitionNotExistException_TableNotExist() 
throws Exception {
-               exception.expect(PartitionNotExistException.class);
-               catalog.getPartition(path1, createPartitionSpec());
-       }
-
-       @Test
-       public void 
testGetPartition_PartitionNotExistException_TableNotPartitioned() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createTable(), false);
-               CatalogPartitionSpec partitionSpec = createPartitionSpec();
-               exception.expect(PartitionNotExistException.class);
-               exception.expectMessage(
-                       String.format("Partition %s of table %s in catalog %s 
does not exist.", partitionSpec,
-                               path1.getFullName(), TEST_CATALOG_NAME));
-               catalog.getPartition(path1, partitionSpec);
-       }
-
-       @Test
-       public void 
testGetPartition_PartitionSpecInvalidException_invalidPartitionSpec() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               CatalogTable table = createPartitionedTable();
-               catalog.createTable(path1, table, false);
-
-               CatalogPartitionSpec partitionSpec = 
createInvalidPartitionSpecSubset();
-               exception.expect(PartitionNotExistException.class);
-               exception.expectMessage(
-                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
-                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
-               catalog.getPartition(path1, partitionSpec);
-       }
-
-       @Test
-       public void 
testGetPartition_PartitionNotExistException_PartitionSpecInvalid_sizeNotEqual() 
throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               CatalogTable table = createPartitionedTable();
-               catalog.createTable(path1, table, false);
-
-               CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(
-                       new HashMap<String, String>() {{
-                               put("second", "bob");
-                       }}
-               );
-               exception.expect(PartitionNotExistException.class);
-               exception.expectMessage(
-                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
-                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
-               catalog.getPartition(path1, partitionSpec);
-       }
-
-       @Test
-       public void testGetPartition_PartitionNotExistException() throws 
Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createPartitionedTable(), false);
-
-               CatalogPartitionSpec partitionSpec = createPartitionSpec();
-               exception.expect(PartitionNotExistException.class);
-               exception.expectMessage(
-                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
-                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
-               catalog.getPartition(path1, partitionSpec);
-       }
-
-       @Test
-       public void testPartitionExists() throws Exception {
-               catalog.createDatabase(db1, createDb(), false);
-               catalog.createTable(path1, createPartitionedTable(), false);
-               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
-
-               assertTrue(catalog.partitionExists(path1, 
createPartitionSpec()));
-               assertFalse(catalog.partitionExists(path2, 
createPartitionSpec()));
-               
assertFalse(catalog.partitionExists(ObjectPath.fromString("non.exist"), 
createPartitionSpec()));
-       }
-
        // ------ statistics ------
 
        @Test
@@ -542,48 +212,11 @@ public class GenericInMemoryCatalogTest extends 
CatalogTestBase {
                        TEST_COMMENT);
        }
 
-       private CatalogPartitionSpec createPartitionSpec() {
-               return new CatalogPartitionSpec(
-                       new HashMap<String, String>() {{
-                               put("third", "2000");
-                               put("second", "bob");
-                       }});
-       }
-
-       private CatalogPartitionSpec createAnotherPartitionSpec() {
-               return new CatalogPartitionSpec(
-                       new HashMap<String, String>() {{
-                               put("third", "2010");
-                               put("second", "bob");
-                       }});
-       }
-
-       private CatalogPartitionSpec createPartitionSpecSubset() {
-               return new CatalogPartitionSpec(
-                       new HashMap<String, String>() {{
-                               put("second", "bob");
-                       }});
-       }
-
-       private CatalogPartitionSpec createInvalidPartitionSpecSubset() {
-               return new CatalogPartitionSpec(
-                       new HashMap<String, String>() {{
-                               put("third", "2010");
-                       }});
-       }
-
-       private CatalogPartition createPartition() {
-               return new GenericCatalogPartition(getBatchTableProperties(), 
"Generic batch table");
-       }
-
-       private CatalogPartition createAnotherPartition() {
+       @Override
+       public CatalogPartition createPartition() {
                return new GenericCatalogPartition(getBatchTableProperties(), 
"Generic batch table");
        }
 
-       private CatalogPartition createPartition(Map<String, String> props) {
-               return new GenericCatalogPartition(props, "Generic catalog 
table");
-       }
-
        @Override
        public CatalogView createView() {
                return new GenericCatalogView(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index cd5be1a..20c4b7e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -273,7 +273,7 @@ public interface Catalog {
         * @param partitionSpec partition spec of partition to get
         * @return the requested partition
         *
-        * @throws PartitionNotExistException thrown if the partition is not 
partitioned
+        * @throws PartitionNotExistException thrown if the partition doesn't 
exist
         * @throws CatalogException     in case of any runtime exception
         */
        CatalogPartition getPartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
index 5bfb98a..88d195e 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestBase.java
@@ -27,8 +27,12 @@ import 
org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.functions.ScalarFunction;
 
 import org.junit.After;
@@ -38,6 +42,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -47,6 +52,7 @@ import java.util.Optional;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -753,6 +759,330 @@ public abstract class CatalogTestBase {
                catalog.dropDatabase(db1, false);
        }
 
+       // ------ partitions ------
+
+       @Test
+       public void testCreatePartition() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+
+               assertTrue(catalog.listPartitions(path1).isEmpty());
+
+               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
+
+               assertEquals(Collections.singletonList(createPartitionSpec()), 
catalog.listPartitions(path1));
+               assertEquals(Collections.singletonList(createPartitionSpec()), 
catalog.listPartitions(path1, createPartitionSpecSubset()));
+               checkEquals(createPartition(), catalog.getPartition(path1, 
createPartitionSpec()));
+
+               catalog.createPartition(path1, createAnotherPartitionSpec(), 
createPartition(), false);
+
+               assertEquals(Arrays.asList(createPartitionSpec(), 
createAnotherPartitionSpec()), catalog.listPartitions(path1));
+               assertEquals(Arrays.asList(createPartitionSpec(), 
createAnotherPartitionSpec()), catalog.listPartitions(path1, 
createPartitionSpecSubset()));
+               checkEquals(createPartition(), catalog.getPartition(path1, 
createAnotherPartitionSpec()));
+       }
+
+       @Test
+       public void testCreatePartition_TableNotExistException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               exception.expect(TableNotExistException.class);
+               exception.expectMessage(
+                       String.format("Table (or view) %s does not exist in 
Catalog %s.", path1.getFullName(), TEST_CATALOG_NAME));
+               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
+       }
+
+       @Test
+       public void testCreatePartition_TableNotPartitionedException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createTable(), false);
+
+               exception.expect(TableNotPartitionedException.class);
+               exception.expectMessage(
+                       String.format("Table %s in catalog %s is not 
partitioned.", path1.getFullName(), TEST_CATALOG_NAME));
+               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
+       }
+
+       @Test
+       public void testCreatePartition_PartitionSpecInvalidException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               CatalogTable table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogPartitionSpec partitionSpec = 
createInvalidPartitionSpecSubset();
+               exception.expect(PartitionSpecInvalidException.class);
+               exception.expectMessage(
+                       String.format("PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s.",
+                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), TEST_CATALOG_NAME));
+               catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
+       }
+
+       @Test
+       public void testCreatePartition_PartitionAlreadyExistsException() 
throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+               CatalogPartition partition = createPartition();
+               catalog.createPartition(path1, createPartitionSpec(), 
partition, false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+
+               exception.expect(PartitionAlreadyExistsException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
already exists.",
+                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
+               catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
+       }
+
+       @Test
+       public void testCreatePartition_PartitionAlreadyExists_ignored() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
+               catalog.createPartition(path1, partitionSpec, 
createPartition(), true);
+       }
+
+       @Test
+       public void testDropPartition() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
+
+               assertEquals(Collections.singletonList(createPartitionSpec()), 
catalog.listPartitions(path1));
+
+               catalog.dropPartition(path1, createPartitionSpec(), false);
+
+               assertEquals(Collections.emptyList(), 
catalog.listPartitions(path1));
+       }
+
+       @Test
+       public void testDropPartition_TableNotExist() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
+                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
+               catalog.dropPartition(path1, partitionSpec, false);
+       }
+
+       @Test
+       public void testDropPartition_TableNotPartitioned() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createTable(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
+                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
+               catalog.dropPartition(path1, partitionSpec, false);
+       }
+
+       @Test
+       public void testDropPartition_PartitionSpecInvalid() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               CatalogTable table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogPartitionSpec partitionSpec = 
createInvalidPartitionSpecSubset();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
+                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
+               catalog.dropPartition(path1, partitionSpec, false);
+       }
+
+       @Test
+       public void testDropPartition_PartitionNotExist() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.", partitionSpec, path1.getFullName(), TEST_CATALOG_NAME));
+               catalog.dropPartition(path1, partitionSpec, false);
+       }
+
+       @Test
+       public void testDropPartition_PartitionNotExist_ignored() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+               catalog.dropPartition(path1, createPartitionSpec(), true);
+       }
+
+       @Test
+       public void testAlterPartition() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
+
+               assertEquals(Collections.singletonList(createPartitionSpec()), 
catalog.listPartitions(path1));
+               CatalogPartition cp = catalog.getPartition(path1, 
createPartitionSpec());
+               checkEquals(createPartition(), cp);
+               assertNull(cp.getProperties().get("k"));
+
+               CatalogPartition another = createPartition();
+               another.getProperties().put("k", "v");
+               catalog.alterPartition(path1, createPartitionSpec(), another, 
false);
+
+               assertEquals(Collections.singletonList(createPartitionSpec()), 
catalog.listPartitions(path1));
+               cp = catalog.getPartition(path1, createPartitionSpec());
+               checkEquals(another, cp);
+               assertEquals("v", cp.getProperties().get("k"));
+       }
+
+       @Test
+       public void testAlterPartition_TableNotExist() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
+                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
+               catalog.alterPartition(path1, partitionSpec, createPartition(), 
false);
+       }
+
+       @Test
+       public void testAlterPartition_TableNotPartitioned() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createTable(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
+                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
+               catalog.alterPartition(path1, partitionSpec, createPartition(), 
false);
+       }
+
+       @Test
+       public void testAlterPartition_PartitionSpecInvalid() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               CatalogTable table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogPartitionSpec partitionSpec = 
createInvalidPartitionSpecSubset();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
+                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
+               catalog.alterPartition(path1, partitionSpec, createPartition(), 
false);
+       }
+
+       @Test
+       public void testAlterPartition_PartitionNotExist() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
+                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
+               catalog.alterPartition(path1, partitionSpec, createPartition(), 
false);
+       }
+
+       @Test
+       public void testAlterPartition_PartitionNotExist_ignored() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+               catalog.alterPartition(path1, createPartitionSpec(), 
createPartition(), true);
+       }
+
+       @Test
+       public void testGetPartition_TableNotExist() throws Exception {
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.", partitionSpec,
+                               path1.getFullName(), TEST_CATALOG_NAME));
+               catalog.getPartition(path1, partitionSpec);
+       }
+
+       @Test
+       public void testGetPartition_TableNotPartitioned() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createTable(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.", partitionSpec,
+                               path1.getFullName(), TEST_CATALOG_NAME));
+               catalog.getPartition(path1, partitionSpec);
+       }
+
+       @Test
+       public void 
testGetPartition_PartitionSpecInvalid_invalidPartitionSpec() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               CatalogTable table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogPartitionSpec partitionSpec = 
createInvalidPartitionSpecSubset();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
+                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
+               catalog.getPartition(path1, partitionSpec);
+       }
+
+       @Test
+       public void testGetPartition_PartitionSpecInvalid_sizeNotEqual() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               CatalogTable table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(
+                       new HashMap<String, String>() {{
+                               put("second", "bob");
+                       }}
+               );
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
+                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
+               catalog.getPartition(path1, partitionSpec);
+       }
+
+       @Test
+       public void testGetPartition_PartitionNotExist() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
+                               partitionSpec, path1.getFullName(), 
TEST_CATALOG_NAME));
+               catalog.getPartition(path1, partitionSpec);
+       }
+
+       @Test
+       public void testPartitionExists() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
+
+               assertTrue(catalog.partitionExists(path1, 
createPartitionSpec()));
+               assertFalse(catalog.partitionExists(path2, 
createPartitionSpec()));
+               
assertFalse(catalog.partitionExists(ObjectPath.fromString("non.exist"), 
createPartitionSpec()));
+       }
+
+       @Test
+       public void testListPartitionPartialSpec() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
+               catalog.createPartition(path1, createAnotherPartitionSpec(), 
createPartition(), false);
+
+               assertEquals(2, catalog.listPartitions(path1, 
createPartitionSpecSubset()).size());
+               assertEquals(1, catalog.listPartitions(path1, 
createAnotherPartitionSpecSubset()).size());
+       }
+
        // ------ utilities ------
 
        /**
@@ -832,6 +1162,13 @@ public abstract class CatalogTestBase {
         */
        protected abstract CatalogFunction createAnotherFunction();
 
+       /**
+        * Creates a CatalogPartition by specific catalog implementation.
+        *
+        * @return a CatalogPartition
+        */
+       public abstract CatalogPartition createPartition();
+
        protected TableSchema createTableSchema() {
                return new TableSchema(
                        new String[] {"first", "second", "third"},
@@ -858,6 +1195,44 @@ public abstract class CatalogTestBase {
                return Arrays.asList("second", "third");
        }
 
+       protected CatalogPartitionSpec createPartitionSpec() {
+               return new CatalogPartitionSpec(
+                       new HashMap<String, String>() {{
+                               put("third", "2000");
+                               put("second", "bob");
+                       }});
+       }
+
+       protected CatalogPartitionSpec createAnotherPartitionSpec() {
+               return new CatalogPartitionSpec(
+                       new HashMap<String, String>() {{
+                               put("third", "2010");
+                               put("second", "bob");
+                       }});
+       }
+
+       protected CatalogPartitionSpec createPartitionSpecSubset() {
+               return new CatalogPartitionSpec(
+                       new HashMap<String, String>() {{
+                               put("second", "bob");
+                       }});
+       }
+
+       protected CatalogPartitionSpec createAnotherPartitionSpecSubset() {
+               return new CatalogPartitionSpec(
+                       new HashMap<String, String>() {{
+                               put("third", "2000");
+                       }}
+               );
+       }
+
+       protected CatalogPartitionSpec createInvalidPartitionSpecSubset() {
+               return new CatalogPartitionSpec(
+                       new HashMap<String, String>() {{
+                               put("third", "2010");
+                       }});
+       }
+
        protected Map<String, String> getBatchTableProperties() {
                return new HashMap<String, String>() {{
                        put(IS_STREAMING, "false");
@@ -1032,4 +1407,8 @@ public abstract class CatalogTestBase {
                assertEquals(f1.getClassName(), f2.getClassName());
                assertEquals(f1.getProperties(), f2.getProperties());
        }
+
+       protected void checkEquals(CatalogPartition expected, CatalogPartition 
actual) {
+               assertEquals(expected.getProperties(), actual.getProperties());
+       }
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 2b98091..2b7593e 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -50,10 +50,6 @@ public class CatalogTestUtil {
                assertEquals(d1.getProperties(), d2.getProperties());
        }
 
-       public static void checkEquals(CatalogPartition p1, CatalogPartition 
p2) {
-               assertEquals(p1.getProperties(), p2.getProperties());
-       }
-
        static void checkEquals(CatalogTableStatistics ts1, 
CatalogTableStatistics ts2) {
                assertEquals(ts1.getRowCount(), ts2.getRowCount());
                assertEquals(ts1.getFileCount(), ts2.getFileCount());

Reply via email to