This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cbc9bf  [FLINK-11518] [table] Add partition related catalog APIs and 
implement them in GenericInMemoryCatalog
6cbc9bf is described below

commit 6cbc9bf2cd6142c34074deaec56bb339d30b3b93
Author: Bowen L <[email protected]>
AuthorDate: Thu Apr 25 04:48:31 2019 -0700

    [FLINK-11518] [table] Add partition related catalog APIs and implement them 
in GenericInMemoryCatalog
    
    This closes #8222
---
 .../table/catalog/GenericCatalogPartition.java     |  61 +++
 .../flink/table/catalog/GenericCatalogTable.java   |  45 ++-
 .../table/catalog/GenericInMemoryCatalog.java      | 157 ++++++++
 .../flink/table/catalog/CatalogTestUtil.java       |  23 +-
 .../table/catalog/GenericInMemoryCatalogTest.java  | 446 +++++++++++++++++++++
 .../{CatalogTable.java => CatalogPartition.java}   |  36 +-
 .../flink/table/catalog/CatalogPartitionSpec.java  |  75 ++++
 .../apache/flink/table/catalog/CatalogTable.java   |  16 +
 .../flink/table/catalog/ReadableCatalog.java       |  59 +++
 .../table/catalog/ReadableWritableCatalog.java     |  76 +++-
 .../PartitionAlreadyExistsException.java           |  46 +++
 .../PartitionNotExistException.java}               |  32 +-
 .../exceptions/PartitionSpecInvalidException.java  |  52 +++
 .../TableNotPartitionedException.java}             |  23 +-
 14 files changed, 1100 insertions(+), 47 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
new file mode 100644
index 0000000..085278e
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A generic catalog partition implementation.
+ */
+public class GenericCatalogPartition implements CatalogPartition {
+       private final Map<String, String> properties;
+
+       private String comment = "This is a generic catalog partition";
+
+       public GenericCatalogPartition(Map<String, String> properties) {
+               this.properties = properties;
+       }
+
+       public GenericCatalogPartition(Map<String, String> properties, String 
comment) {
+               this(properties);
+               this.comment = comment;
+       }
+
+       @Override
+       public Map<String, String> getProperties() {
+               return properties;
+       }
+
+       @Override
+       public CatalogPartition copy() {
+               return new GenericCatalogPartition(new HashMap<>(properties));
+       }
+
+       @Override
+       public Optional<String> getDescription() {
+               return Optional.of(comment);
+       }
+
+       @Override
+       public Optional<String> getDetailedDescription() {
+               return Optional.of("This is a generic catalog partition with 
detailed description");
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
index 4b1be8f..47498e7 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.plan.stats.TableStats;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
@@ -33,23 +35,34 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class GenericCatalogTable implements CatalogTable {
        // Schema of the table (column names and types)
        private final TableSchema tableSchema;
-       // Properties of the table
-       private final Map<String, String> properties;
        // Statistics of the table
        private final TableStats tableStats;
+       // Partition keys if this is a partitioned table. It's an empty set if 
the table is not partitioned
+       private final List<String> partitionKeys;
+       // Properties of the table
+       private final Map<String, String> properties;
        // Comment of the table
        private String comment = "This is a generic catalog table.";
 
-       public GenericCatalogTable(TableSchema tableSchema, TableStats 
tableStats, Map<String, String> properties) {
-               this.tableSchema = tableSchema;
-               this.tableStats = tableStats;
+       public GenericCatalogTable(
+                       TableSchema tableSchema,
+                       TableStats tableStats,
+                       List<String> partitionKeys,
+                       Map<String, String> properties,
+                       String comment) {
+               this.tableSchema = checkNotNull(tableSchema, "tableSchema 
cannot be null");
+               this.tableStats = checkNotNull(tableStats, "tableStats cannot 
be null");
+               this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys 
cannot be null");
                this.properties = checkNotNull(properties, "properties cannot 
be null");
+               this.comment = comment;
        }
 
-       public GenericCatalogTable(TableSchema tableSchema, TableStats 
tableStats, Map<String, String> properties,
-               String comment) {
-               this(tableSchema, tableStats, properties);
-               this.comment = comment;
+       public GenericCatalogTable(
+                       TableSchema tableSchema,
+                       TableStats tableStats,
+                       Map<String, String> properties,
+                       String comment) {
+               this(tableSchema, tableStats, new ArrayList<>(), properties, 
comment);
        }
 
        @Override
@@ -58,6 +71,16 @@ public class GenericCatalogTable implements CatalogTable {
        }
 
        @Override
+       public boolean isPartitioned() {
+               return !partitionKeys.isEmpty();
+       }
+
+       @Override
+       public List<String> getPartitionKeys() {
+               return partitionKeys;
+       }
+
+       @Override
        public Map<String, String> getProperties() {
                return properties;
        }
@@ -69,8 +92,8 @@ public class GenericCatalogTable implements CatalogTable {
 
        @Override
        public GenericCatalogTable copy() {
-               return new GenericCatalogTable(this.tableSchema.copy(), 
this.tableStats.copy(),
-                       new HashMap<>(this.properties), comment);
+               return new GenericCatalogTable(
+                       this.tableSchema.copy(), this.tableStats.copy(), new 
ArrayList<>(partitionKeys), new HashMap<>(this.properties), comment);
        }
 
        @Override
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index dcdfd9a..6a4e554 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -18,11 +18,16 @@
 
 package org.apache.flink.table.catalog;
 
+import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.util.StringUtils;
 
 import java.util.ArrayList;
@@ -46,6 +51,7 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
        private final String catalogName;
        private final Map<String, CatalogDatabase> databases;
        private final Map<ObjectPath, CatalogBaseTable> tables;
+       private final Map<ObjectPath, Map<CatalogPartitionSpec, 
CatalogPartition>> partitions;
 
        public GenericInMemoryCatalog(String name) {
                checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name 
cannot be null or empty");
@@ -54,6 +60,7 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
                this.databases = new LinkedHashMap<>();
                this.databases.put(DEFAULT_DB, new GenericCatalogDatabase(new 
HashMap<>()));
                this.tables = new LinkedHashMap<>();
+               this.partitions = new LinkedHashMap<>();
        }
 
        @Override
@@ -178,6 +185,10 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
                        }
                } else {
                        tables.put(tablePath, table.copy());
+
+                       if ((table instanceof CatalogTable) && ((CatalogTable) 
table).isPartitioned()) {
+                               partitions.put(tablePath, new 
LinkedHashMap<>());
+                       }
                }
        }
 
@@ -187,6 +198,10 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
                checkArgument(tablePath != null);
                checkArgument(newTable != null);
 
+               // TODO: validate the new and old CatalogBaseTable must be of 
the same type. For example, this doesn't
+               //              allow alter a regular table to partitioned 
table, or alter a view to a table, and vice versa.
+               //              And also add unit tests.
+
                if (tableExists(tablePath)) {
                        tables.put(tablePath, newTable.copy());
                } else if (!ignoreIfNotExists) {
@@ -202,6 +217,8 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
 
                if (tableExists(tablePath)) {
                        tables.remove(tablePath);
+
+                       partitions.remove(tablePath);
                } else if (!ignoreIfNotExists) {
                        throw new TableNotExistException(catalogName, 
tablePath);
                }
@@ -220,6 +237,10 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
                                throw new 
TableAlreadyExistException(catalogName, newPath);
                        } else {
                                tables.put(newPath, tables.remove(tablePath));
+
+                               if (partitions.containsKey(tablePath)) {
+                                       partitions.put(newPath, 
partitions.remove(tablePath));
+                               }
                        }
                } else if (!ignoreIfNotExists) {
                        throw new TableNotExistException(catalogName, 
tablePath);
@@ -269,4 +290,140 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
                return tablePath != null && 
databaseExists(tablePath.getDatabaseName()) && tables.containsKey(tablePath);
        }
 
+       // ------ partitions ------
+
+       @Override
+       public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+               throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionAlreadyExistsException, 
CatalogException {
+
+               validatePartitionSpec(tablePath, partitionSpec);
+
+               if (partitionExists(tablePath, partitionSpec)) {
+                       if (!ignoreIfExists) {
+                               throw new 
PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
+                       }
+               } else {
+                       partitions.get(tablePath).put(partitionSpec, 
partition.copy());
+               }
+       }
+
+       @Override
+       public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, boolean ignoreIfNotExists)
+               throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
+
+               validatePartitionSpec(tablePath, partitionSpec);
+
+               if (partitionExists(tablePath, partitionSpec)) {
+                       partitions.get(tablePath).remove(partitionSpec);
+               } else if (!ignoreIfNotExists) {
+                       throw new PartitionNotExistException(catalogName, 
tablePath, partitionSpec);
+               }
+       }
+
+       @Override
+       public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
+               throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
+
+               validatePartitionSpec(tablePath, partitionSpec);
+
+               if (partitionExists(tablePath, partitionSpec)) {
+                       partitions.get(tablePath).put(partitionSpec, 
newPartition.copy());
+               } else if (!ignoreIfNotExists) {
+                       throw new PartitionNotExistException(catalogName, 
tablePath, partitionSpec);
+               }
+       }
+
+       @Override
+       public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+               throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+
+               validatePartitionedTable(tablePath);
+
+               return new ArrayList<>(partitions.get(tablePath).keySet());
+       }
+
+       @Override
+       public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+               throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, CatalogException {
+
+               validatePartitionSpec(tablePath, partitionSpec);
+
+               return partitions.get(tablePath).keySet().stream()
+                       .filter(ps -> 
ps.getPartitionSpec().entrySet().containsAll(partitionSpec.getPartitionSpec().entrySet()))
+                       .collect(Collectors.toList());
+       }
+
+       @Override
+       public CatalogPartition getPartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+               throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
+
+               CatalogTable table = validatePartitionSpec(tablePath, 
partitionSpec);
+
+               if (partitionSpec.getPartitionSpec().size() < 
table.getPartitionKeys().size()) {
+                       throw new PartitionSpecInvalidException(catalogName, 
table.getPartitionKeys(), tablePath, partitionSpec);
+               }
+
+               if (partitionExists(tablePath, partitionSpec)) {
+                       return 
partitions.get(tablePath).get(partitionSpec).copy();
+               } else {
+                       throw new PartitionNotExistException(catalogName, 
tablePath, partitionSpec);
+               }
+       }
+
+       @Override
+       public boolean partitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+               throws CatalogException {
+
+               return partitions.containsKey(tablePath) && 
partitions.get(tablePath).containsKey(partitionSpec);
+       }
+
+       /**
+        * Validate the partitioned table and partitionSpec.
+        */
+       private CatalogTable validatePartitionSpec(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+               throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException {
+
+               CatalogTable table = validatePartitionedTable(tablePath);
+
+               List<String> partitionKeys = table.getPartitionKeys();
+               Map<String, String> spec = partitionSpec.getPartitionSpec();
+
+               // The size of partition spec should not exceed the size of 
partition keys
+               if (partitionKeys.size() < spec.size()) {
+                       throw new PartitionSpecInvalidException(catalogName, 
partitionKeys, tablePath, partitionSpec);
+               } else {
+                       int size = spec.size();
+
+                       // PartitionSpec should contain the first 'size' number 
of keys in partition key list
+                       for (int i = 0; i < size; i++) {
+                               if (!spec.containsKey(partitionKeys.get(i))) {
+                                       throw new 
PartitionSpecInvalidException(catalogName, partitionKeys, tablePath, 
partitionSpec);
+                               }
+                       }
+               }
+
+               return table;
+       }
+
+       /**
+        * Validate the partitioned table.
+        */
+       private CatalogTable validatePartitionedTable(ObjectPath tablePath)
+               throws TableNotExistException, TableNotPartitionedException {
+
+               CatalogBaseTable baseTable = getTable(tablePath);
+
+               if (!(baseTable instanceof CatalogTable)) {
+                       throw new CatalogException(
+                               String.format("%s in Catalog %s is not a 
CatalogTable", tablePath.getFullName(), catalogName));
+               }
+
+               CatalogTable table = (CatalogTable) baseTable;
+
+               if (!table.isPartitioned()) {
+                       throw new TableNotPartitionedException(catalogName, 
tablePath);
+               }
+
+               return table;
+       }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 69ccb63..9974272 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.plan.stats.TableStats;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -36,7 +37,7 @@ public class CatalogTestUtil {
 
        public static GenericCatalogTable createTable(String comment) {
                TableSchema tableSchema = 
TableSchema.fromTypeInfo(getRowTypeInfo());
-               return createTable(tableSchema, createTableStats(), new 
HashMap<String, String>(), comment);
+               return new GenericCatalogTable(tableSchema, createTableStats(), 
new HashMap<>(), comment);
        }
 
        public static RowTypeInfo getRowTypeInfo() {
@@ -49,14 +50,21 @@ public class CatalogTestUtil {
                return new TableStats(2);
        }
 
-       public static GenericCatalogTable createTable(TableSchema schema, 
Map<String, String> tableProperties,
+       public static GenericCatalogTable createTable(
+               TableSchema schema,
+               Map<String, String> tableProperties,
                String comment) {
-               return createTable(schema, new TableStats(0), tableProperties, 
comment);
+
+               return new GenericCatalogTable(schema, new TableStats(0), 
tableProperties, comment);
        }
 
-       public static GenericCatalogTable createTable(TableSchema schema, 
TableStats stats,
-               Map<String, String> tableProperties, String comment) {
-               return new GenericCatalogTable(schema, stats, tableProperties, 
comment);
+       public static GenericCatalogTable createPartitionedTable(
+               TableSchema schema,
+               List<String> partitionKeys,
+               Map<String, String> tableProperties,
+               String comment) {
+
+               return new GenericCatalogTable(schema, new TableStats(0), 
partitionKeys, tableProperties, comment);
        }
 
        public static void checkEquals(GenericCatalogTable t1, 
GenericCatalogTable t2) {
@@ -79,4 +87,7 @@ public class CatalogTestUtil {
                assertEquals(d1.getProperties(), d2.getProperties());
        }
 
+       protected static void checkEquals(CatalogPartition p1, CatalogPartition 
p2) {
+               assertEquals(p1.getProperties(), p2.getProperties());
+       }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index f679238..ad9fd92 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -24,8 +24,12 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 
 import org.junit.After;
 import org.junit.Before;
@@ -42,6 +46,7 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -115,10 +120,12 @@ public class GenericInMemoryCatalogTest {
                CatalogDatabase database = catalog.getDatabase(db1);
                
assertTrue(TEST_COMMENT.equals(database.getDescription().get()));
 
+               // Non-partitioned table
                GenericCatalogTable table = createTable();
                catalog.createTable(path1, table, false);
 
                CatalogBaseTable tableCreated = catalog.getTable(path1);
+
                CatalogTestUtil.checkEquals(table, (GenericCatalogTable) 
tableCreated);
                assertEquals(TABLE_COMMENT, 
tableCreated.getDescription().get());
 
@@ -128,6 +135,17 @@ public class GenericInMemoryCatalogTest {
                assertEquals(path1.getObjectName(), tables.get(0));
 
                catalog.dropTable(path1, false);
+
+               // Partitioned table
+               table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogTestUtil.checkEquals(table, (GenericCatalogTable) 
catalog.getTable(path1));
+
+               tables = catalog.listTables(db1);
+
+               assertEquals(1, tables.size());
+               assertEquals(path1.getObjectName(), tables.get(0));
        }
 
        @Test
@@ -191,6 +209,19 @@ public class GenericInMemoryCatalogTest {
                catalog.dropTable(path1, false);
 
                assertFalse(catalog.tableExists(path1));
+
+               // Partitioned table
+               catalog.createTable(path1, createPartitionedTable(), false);
+               CatalogPartition catalogPartition = createPartition();
+               CatalogPartitionSpec catalogPartitionSpec = 
createPartitionSpec();
+               catalog.createPartition(path1, catalogPartitionSpec, 
catalogPartition, false);
+
+               assertTrue(catalog.tableExists(path1));
+
+               catalog.dropTable(path1, false);
+
+               assertFalse(catalog.tableExists(path1));
+               assertFalse(catalog.partitionExists(path1, 
catalogPartitionSpec));
        }
 
        @Test
@@ -240,6 +271,17 @@ public class GenericInMemoryCatalogTest {
                CatalogTestUtil.checkEquals(newTable, (GenericCatalogTable) 
catalog.getTable(path1));
 
                catalog.dropTable(path1, false);
+
+               // Partitioned table
+               table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogTestUtil.checkEquals(table, (GenericCatalogTable) 
catalog.getTable(path1));
+
+               newTable = createAnotherPartitionedTable();
+               catalog.alterTable(path1, newTable, false);
+
+               CatalogTestUtil.checkEquals(newTable, (GenericCatalogTable) 
catalog.getTable(path1));
        }
 
        @Test
@@ -260,6 +302,8 @@ public class GenericInMemoryCatalogTest {
        @Test
        public void testRenameTable() throws Exception {
                catalog.createDatabase(db1, createDb(), false);
+
+               // Non-partitioned table
                GenericCatalogTable table = createTable();
                catalog.createTable(path1, table, false);
 
@@ -269,6 +313,25 @@ public class GenericInMemoryCatalogTest {
 
                CatalogTestUtil.checkEquals(table, (GenericCatalogTable) 
catalog.getTable(path3));
                assertFalse(catalog.tableExists(path1));
+
+               catalog.dropTable(path3, false);
+
+               // Partitioned table
+               table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+               CatalogPartition catalogPartition = createPartition();
+               CatalogPartitionSpec catalogPartitionSpec = 
createPartitionSpec();
+               catalog.createPartition(path1, catalogPartitionSpec, 
catalogPartition, false);
+
+               CatalogTestUtil.checkEquals(table, (GenericCatalogTable) 
catalog.getTable(path1));
+               assertTrue(catalog.partitionExists(path1, 
catalogPartitionSpec));
+
+               catalog.renameTable(path1, t2, false);
+
+               CatalogTestUtil.checkEquals(table, (GenericCatalogTable) 
catalog.getTable(path3));
+               assertTrue(catalog.partitionExists(path3, 
catalogPartitionSpec));
+               assertFalse(catalog.tableExists(path1));
+               assertFalse(catalog.partitionExists(path1, 
catalogPartitionSpec));
        }
 
        @Test
@@ -562,6 +625,327 @@ public class GenericInMemoryCatalogTest {
                catalog.dropTable(viewPath2, false);
        }
 
+       // ------ partitions ------
+
+       @Test
+       public void testCreatePartition() throws Exception {
+               CatalogTable table = createPartitionedTable();
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, table, false);
+
+               assertTrue(catalog.listPartitions(path1).isEmpty());
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
+
+               assertEquals(Arrays.asList(partitionSpec), 
catalog.listPartitions(path1));
+               assertEquals(Arrays.asList(partitionSpec), 
catalog.listPartitions(path1, createPartitionSpecSubset()));
+               CatalogTestUtil.checkEquals(createPartition(), 
catalog.getPartition(path1, createPartitionSpec()));
+
+               CatalogPartitionSpec anotherPartitionSpec = 
createAnotherPartitionSpec();
+               CatalogPartition anotherPartition = createAnotherPartition();
+               catalog.createPartition(path1, anotherPartitionSpec, 
anotherPartition, false);
+
+               assertEquals(Arrays.asList(partitionSpec, 
anotherPartitionSpec), catalog.listPartitions(path1));
+               assertEquals(Arrays.asList(partitionSpec, 
anotherPartitionSpec), catalog.listPartitions(path1, 
createPartitionSpecSubset()));
+               CatalogTestUtil.checkEquals(anotherPartition, 
catalog.getPartition(path1, anotherPartitionSpec));
+
+               CatalogPartitionSpec invalid = 
createInvalidPartitionSpecSubset();
+               exception.expect(PartitionSpecInvalidException.class);
+               exception.expectMessage(
+                       String.format("PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s",
+                               invalid, table.getPartitionKeys(), 
path1.getFullName(), testCatalogName));
+               catalog.listPartitions(path1, invalid);
+       }
+
+       @Test
+       public void testCreatePartition_TableNotExistException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               exception.expect(TableNotExistException.class);
+               exception.expectMessage(
+                       String.format("Table (or view) %s does not exist in 
Catalog %s.", path1.getFullName(), testCatalogName));
+               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
+       }
+
+       @Test
+       public void testCreatePartition_TableNotPartitionedException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createTable(), false);
+
+               exception.expect(TableNotPartitionedException.class);
+               exception.expectMessage(
+                       String.format("Table %s in catalog %s is not 
partitioned.", path1.getFullName(), testCatalogName));
+               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
+       }
+
+       @Test
+       public void testCreatePartition_PartitionSpecInvalidException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               CatalogTable table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogPartitionSpec partitionSpec = 
createInvalidPartitionSpecSubset();
+               exception.expect(PartitionSpecInvalidException.class);
+               exception.expectMessage(
+                       String.format("PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s.",
+                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), testCatalogName));
+               catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
+       }
+
+       @Test
+       public void testCreatePartition_PartitionAlreadyExistsException() 
throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+               CatalogPartition partition = createPartition();
+               catalog.createPartition(path1, createPartitionSpec(), 
partition, false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+
+               exception.expect(PartitionAlreadyExistsException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
already exists.",
+                               partitionSpec, path1.getFullName(), 
testCatalogName));
+               catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
+       }
+
+       @Test
+       public void testCreatePartition_PartitionAlreadyExists_ignored() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
+               catalog.createPartition(path1, partitionSpec, 
createPartition(), true);
+       }
+
+       @Test
+       public void testDropPartition() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
+
+               assertEquals(Arrays.asList(createPartitionSpec()), 
catalog.listPartitions(path1));
+
+               catalog.dropPartition(path1, createPartitionSpec(), false);
+
+               assertEquals(Arrays.asList(), catalog.listPartitions(path1));
+       }
+
+       @Test
+       public void testDropPartition_TableNotExistException() throws Exception 
{
+               catalog.createDatabase(db1, createDb(), false);
+
+               exception.expect(TableNotExistException.class);
+               exception.expectMessage(
+                       String.format("Table (or view) %s does not exist in 
Catalog %s.", path1.getFullName(), testCatalogName));
+               catalog.dropPartition(path1, createPartitionSpec(), false);
+       }
+
+       @Test
+       public void testDropPartition_TableNotPartitionedException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createTable(), false);
+
+               exception.expect(TableNotPartitionedException.class);
+               exception.expectMessage(
+                       String.format("Table %s in catalog %s is not 
partitioned.", path1.getFullName(), testCatalogName));
+               catalog.dropPartition(path1, createPartitionSpec(), false);
+       }
+
+       @Test
+       public void testDropPartition_PartitionSpecInvalidException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               CatalogTable table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogPartitionSpec partitionSpec = 
createInvalidPartitionSpecSubset();
+               exception.expect(PartitionSpecInvalidException.class);
+               exception.expectMessage(
+                       String.format("PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s.",
+                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), testCatalogName));
+               catalog.dropPartition(path1, partitionSpec, false);
+       }
+
+       @Test
+       public void testDropPartition_PartitionNotExistException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.", partitionSpec, path1.getFullName(), testCatalogName));
+               catalog.dropPartition(path1, partitionSpec, false);
+       }
+
+       @Test
+       public void testDropPartition_PartitionNotExist_ignored() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+               catalog.dropPartition(path1, createPartitionSpec(), true);
+       }
+
+       @Test
+       public void testAlterPartition() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               catalog.createPartition(path1, partitionSpec, 
createPartition(), false);
+
+               assertEquals(Arrays.asList(partitionSpec), 
catalog.listPartitions(path1));
+
+               CatalogPartition cp = catalog.getPartition(path1, 
createPartitionSpec());
+               CatalogTestUtil.checkEquals(createPartition(), cp);
+
+               assertNull(cp.getProperties().get("k"));
+
+               Map<String, String> partitionProperties = 
getBatchTableProperties();
+               partitionProperties.put("k", "v");
+
+               CatalogPartition another = createPartition(partitionProperties);
+               catalog.alterPartition(path1, createPartitionSpec(), another, 
false);
+
+               assertEquals(Arrays.asList(createPartitionSpec()), 
catalog.listPartitions(path1));
+
+               cp = catalog.getPartition(path1, createPartitionSpec());
+               CatalogTestUtil.checkEquals(another, cp);
+
+               assertEquals("v", cp.getProperties().get("k"));
+       }
+
+       @Test
+       public void testAlterPartition_TableNotExistException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               exception.expect(TableNotExistException.class);
+               exception.expectMessage(
+                       String.format("Table (or view) %s does not exist in 
Catalog %s.", path1.getFullName(), testCatalogName));
+               catalog.alterPartition(path1, partitionSpec, createPartition(), 
false);
+       }
+
+       @Test
+       public void testAlterPartition_TableNotPartitionedException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createTable(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               exception.expect(TableNotPartitionedException.class);
+               exception.expectMessage(
+                       String.format("Table %s in catalog %s is not 
partitioned.", path1.getFullName(), testCatalogName));
+               catalog.alterPartition(path1, partitionSpec, createPartition(), 
false);
+       }
+
+       @Test
+       public void testAlterPartition_PartitionSpecInvalidException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               CatalogTable table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogPartitionSpec partitionSpec = 
createInvalidPartitionSpecSubset();
+               exception.expect(PartitionSpecInvalidException.class);
+               exception.expectMessage(
+                       String.format("PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s.",
+                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), testCatalogName));
+               catalog.alterPartition(path1, partitionSpec, createPartition(), 
false);
+       }
+
+       @Test
+       public void testAlterPartition_PartitionNotExistException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+
+               CatalogPartition catalogPartition = createPartition();
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
+                               partitionSpec, path1.getFullName(), 
testCatalogName));
+               catalog.alterPartition(path1, partitionSpec, catalogPartition, 
false);
+       }
+
+       @Test
+       public void testAlterPartition_PartitionNotExist_ignored() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+               catalog.alterPartition(path1, createPartitionSpec(), 
createPartition(), true);
+       }
+
+       @Test
+       public void testGetPartition_TableNotExistException() throws Exception {
+               exception.expect(TableNotExistException.class);
+               catalog.getPartition(path1, createPartitionSpec());
+       }
+
+       @Test
+       public void testGetPartition_TableNotPartitionedException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createTable(), false);
+
+               exception.expect(TableNotPartitionedException.class);
+               exception.expectMessage(
+                       String.format("Table %s in catalog %s is not 
partitioned.", path1.getFullName(), testCatalogName));
+               catalog.getPartition(path1, createPartitionSpec());
+       }
+
+       @Test
+       public void 
testGetPartition_PartitionSpecInvalidException_invalidPartitionSpec() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               CatalogTable table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogPartitionSpec partitionSpec = 
createInvalidPartitionSpecSubset();
+               exception.expect(PartitionSpecInvalidException.class);
+               exception.expectMessage(
+                       String.format("PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s.",
+                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), testCatalogName));
+               catalog.getPartition(path1, partitionSpec);
+       }
+
+       @Test
+       public void 
testGetPartition_PartitionSpecInvalidException_sizeNotEqual() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               CatalogTable table = createPartitionedTable();
+               catalog.createTable(path1, table, false);
+
+               CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(
+                       new HashMap<String, String>() {{
+                               put("second", "bob");
+                       }}
+               );
+               exception.expect(PartitionSpecInvalidException.class);
+               exception.expectMessage(
+                       String.format("PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s.",
+                               partitionSpec, table.getPartitionKeys(), 
path1.getFullName(), testCatalogName));
+               catalog.getPartition(path1, partitionSpec);
+       }
+
+       @Test
+       public void testGetPartition_PartitionNotExistException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+
+               CatalogPartitionSpec partitionSpec = createPartitionSpec();
+               exception.expect(PartitionNotExistException.class);
+               exception.expectMessage(
+                       String.format("Partition %s of table %s in catalog %s 
does not exist.",
+                               partitionSpec, path1.getFullName(), 
testCatalogName));
+               catalog.getPartition(path1, partitionSpec);
+       }
+
+       @Test
+       public void testPartitionExists() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createTable(path1, createPartitionedTable(), false);
+               catalog.createPartition(path1, createPartitionSpec(), 
createPartition(), false);
+
+               assertTrue(catalog.partitionExists(path1, 
createPartitionSpec()));
+               assertFalse(catalog.partitionExists(path2, 
createPartitionSpec()));
+               
assertFalse(catalog.partitionExists(ObjectPath.fromString("non.exist"), 
createPartitionSpec()));
+       }
+
        // ------ utilities ------
 
        private GenericCatalogTable createStreamingTable() {
@@ -582,6 +966,68 @@ public class GenericInMemoryCatalogTest {
                        getBatchTableProperties(), TABLE_COMMENT);
        }
 
+       protected GenericCatalogTable createPartitionedTable() {
+               return CatalogTestUtil.createPartitionedTable(
+                       createTableSchema(),
+                       createPartitionKeys(),
+                       getBatchTableProperties(),
+                       TABLE_COMMENT);
+       }
+
+       protected GenericCatalogTable createAnotherPartitionedTable() {
+               return CatalogTestUtil.createPartitionedTable(
+                       createAnotherTableSchema(),
+                       createPartitionKeys(),
+                       getBatchTableProperties(),
+                       TABLE_COMMENT);
+       }
+
+       private List<String> createPartitionKeys() {
+               return Arrays.asList("second", "third");
+       }
+
+       private CatalogPartitionSpec createPartitionSpec() {
+               return new CatalogPartitionSpec(
+                       new HashMap<String, String>() {{
+                               put("third", "2000");
+                               put("second", "bob");
+                       }});
+       }
+
+       private CatalogPartitionSpec createAnotherPartitionSpec() {
+               return new CatalogPartitionSpec(
+                       new HashMap<String, String>() {{
+                               put("third", "2010");
+                               put("second", "bob");
+                       }});
+       }
+
+       private CatalogPartitionSpec createPartitionSpecSubset() {
+               return new CatalogPartitionSpec(
+                       new HashMap<String, String>() {{
+                               put("second", "bob");
+                       }});
+       }
+
+       private CatalogPartitionSpec createInvalidPartitionSpecSubset() {
+               return new CatalogPartitionSpec(
+                       new HashMap<String, String>() {{
+                               put("third", "2010");
+                       }});
+       }
+
+       private CatalogPartition createPartition() {
+               return new GenericCatalogPartition(getBatchTableProperties());
+       }
+
+       private CatalogPartition createAnotherPartition() {
+               return new GenericCatalogPartition(getBatchTableProperties());
+       }
+
+       private CatalogPartition createPartition(Map<String, String> props) {
+               return new GenericCatalogPartition(props);
+       }
+
        private CatalogDatabase createDb() {
                return new GenericCatalogDatabase(new HashMap<String, String>() 
{{
                        put("k1", "v1");
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
similarity index 53%
copy from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
copy to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
index 3a25d8f..47dce25 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
@@ -18,15 +18,39 @@
 
 package org.apache.flink.table.catalog;
 
-import org.apache.flink.table.plan.stats.TableStats;
+import java.util.Map;
+import java.util.Optional;
 
 /**
- * Represents a table in a catalog.
+ * Represents a partition object in catalog.
  */
-public interface CatalogTable extends CatalogBaseTable {
+public interface CatalogPartition {
+
+       /**
+        * Get a map of properties associated with the partition.
+        *
+        * @return a map of properties with the partition
+        */
+       Map<String, String> getProperties();
+
+       /**
+        * Get a deep copy of the CatalogPartition instance.
+        *
+        * @return a copy of CatalogPartition instance
+        */
+       CatalogPartition copy();
+
+       /**
+        * Get a brief description of the database.
+        *
+        * @return an optional short description of the database
+        */
+       Optional<String> getDescription();
+
        /**
-        * Get the statistics of the table.
-        * @return table statistics
+        * Get a detailed description of the database.
+        *
+        * @return an optional long description of the database
         */
-       TableStats getStatistics();
+       Optional<String> getDetailedDescription();
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartitionSpec.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartitionSpec.java
new file mode 100644
index 0000000..a2d4cee
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartitionSpec.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Represents a partition spec object in catalog.
+ * Partition columns and values are NOT of strict order, and they need to be 
re-arranged to the correct order
+ * by comparing with a list of strictly ordered partition keys.
+ */
+public class CatalogPartitionSpec {
+
+       // An unmodifiable map as <partition key, value>
+       private final Map<String, String> partitionSpec;
+
+       public CatalogPartitionSpec(Map<String, String> partitionSpec) {
+               checkNotNull(partitionSpec, "partitionSpec cannot be null");
+
+               this.partitionSpec = Collections.unmodifiableMap(partitionSpec);
+       }
+
+       /**
+        * Get the partition spec as key-value map.
+        *
+        * @return a map of partition spec keys and values
+        */
+       public Map<String, String> getPartitionSpec() {
+               return partitionSpec;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               CatalogPartitionSpec that = (CatalogPartitionSpec) o;
+               return partitionSpec.equals(that.partitionSpec);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(partitionSpec);
+       }
+
+       @Override
+       public String toString() {
+               return "CatalogPartitionSpec{" + partitionSpec + '}';
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
index 3a25d8f..545fad9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.plan.stats.TableStats;
 
+import java.util.List;
+
 /**
  * Represents a table in a catalog.
  */
@@ -29,4 +31,18 @@ public interface CatalogTable extends CatalogBaseTable {
         * @return table statistics
         */
        TableStats getStatistics();
+
+       /**
+        * Check if the table is partitioned or not.
+        *
+        * @return true if the table is partitioned; otherwise, false
+        */
+       boolean isPartitioned();
+
+       /**
+        * Get the partition keys of the table. This will be an empty set if 
the table is not partitioned.
+        *
+        * @return partition keys of the table
+        */
+       List<String> getPartitionKeys();
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
index f50e2d3..f0b675d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
@@ -20,7 +20,10 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 
 import java.util.List;
 
@@ -132,4 +135,60 @@ public interface ReadableCatalog {
         */
        boolean tableExists(ObjectPath objectPath) throws CatalogException;
 
+       // ------ partitions ------
+
+       /**
+        * Get CatalogPartitionSpec of all partitions of the table.
+        *
+        * @param tablePath     path of the table
+        * @return a list of CatalogPartitionSpec of the table
+        *
+        * @throws TableNotExistException thrown if the table does not exist in 
the catalog
+        * @throws TableNotPartitionedException thrown if the table is not 
partitioned
+        * @throws CatalogException     in case of any runtime exception
+        */
+       List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+               throws TableNotExistException, TableNotPartitionedException, 
CatalogException;
+
+       /**
+        * Get CatalogPartitionSpec of all partitions that is under the given 
CatalogPartitionSpec in the table.
+        *
+        * @param tablePath     path of the table
+        * @param partitionSpec the partition spec to list
+        * @return a list of CatalogPartitionSpec that is under the given 
CatalogPartitionSpec in the table
+        *
+        * @throws TableNotExistException thrown if the table does not exist in 
the catalog
+        * @throws TableNotPartitionedException thrown if the table is not 
partitioned
+        * @throws PartitionSpecInvalidException thrown if the given partition 
spec is invalid
+        * @throws CatalogException in case of any runtime exception
+        */
+       List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+               throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, CatalogException;
+
+       /**
+        * Get a partition of the given table.
+        * The given partition spec keys and values need to be matched exactly 
for a result.
+        *
+        * @param tablePath path of the table
+        * @param partitionSpec partition spec of partition to get
+        * @return the requested partition
+        *
+        * @throws TableNotExistException thrown if the table does not exist in 
the catalog
+        * @throws TableNotPartitionedException thrown if the table is not 
partitioned
+        * @throws PartitionSpecInvalidException thrown if the given partition 
spec is invalid,
+        * @throws PartitionNotExistException thrown if the partition is not 
partitioned
+        * @throws CatalogException     in case of any runtime exception
+        */
+       CatalogPartition getPartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+               throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionNotExistException, CatalogException;
+
+       /**
+        * Check whether a partition exists or not.
+        *
+        * @param tablePath     path of the table
+        * @param partitionSpec partition spec of the partition to check
+        * @throws CatalogException in case of any runtime exception
+        */
+       boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec) throws CatalogException;
+
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
index 75023cd..755ae27 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
@@ -22,8 +22,12 @@ import 
org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 
 /**
  * An interface responsible for manipulating catalog metadata.
@@ -105,9 +109,9 @@ public interface ReadableWritableCatalog extends 
ReadableCatalog {
        /**
         * Create a new table or view.
         *
-        * @param tablePath      Path of the table or view to be created
-        * @param table          The table definition
-        * @param ignoreIfExists Flag to specify behavior when a table or view 
already exists at the given path:
+        * @param tablePath path of the table or view to be created
+        * @param table the table definition
+        * @param ignoreIfExists flag to specify behavior when a table or view 
already exists at the given path:
         *                       if set to false, it throws a 
TableAlreadyExistException,
         *                       if set to true, do nothing.
         * @throws TableAlreadyExistException if table already exists and 
ignoreIfExists is false
@@ -119,10 +123,12 @@ public interface ReadableWritableCatalog extends 
ReadableCatalog {
 
        /**
         * Modify an existing table or view.
+        * Note that the new and old CatalogBaseTable must be of the same type. 
For example, this doesn't
+        * allow alter a regular table to partitioned table, or alter a view to 
a table, and vice versa.
         *
-        * @param tableName         Path of the table or view to be modified
-        * @param newTable          The new table definition
-        * @param ignoreIfNotExists Flag to specify behavior when the table or 
view does not exist:
+        * @param tableName path of the table or view to be modified
+        * @param newTable the new table definition
+        * @param ignoreIfNotExists flag to specify behavior when the table or 
view does not exist:
         *                          if set to false, throw an exception,
         *                          if set to true, do nothing.
         * @throws TableNotExistException if the table does not exist
@@ -131,4 +137,62 @@ public interface ReadableWritableCatalog extends 
ReadableCatalog {
        void alterTable(ObjectPath tableName, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
                throws TableNotExistException, CatalogException;
 
+       // ------ partitions ------
+
+       /**
+        * Create a partition.
+        *
+        * @param tablePath path of the table.
+        * @param partitionSpec partition spec of the partition
+        * @param partition the partition to add.
+        * @param ignoreIfExists flag to specify behavior if a table with the 
given name already exists:
+        *                       if set to false, it throws a 
TableAlreadyExistException,
+        *                       if set to true, nothing happens.
+        *
+        * @throws TableNotExistException thrown if the target table does not 
exist
+        * @throws TableNotPartitionedException thrown if the target table is 
not partitioned
+        * @throws PartitionSpecInvalidException thrown if the given partition 
spec is invalid
+        * @throws PartitionAlreadyExistsException thrown if the target 
partition already exists
+        * @throws CatalogException in case of any runtime exception
+        */
+       void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+               throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionAlreadyExistsException, 
CatalogException;
+
+       /**
+        * Drop a partition.
+        *
+        * @param tablePath path of the table.
+        * @param partitionSpec partition spec of the partition to drop
+        * @param ignoreIfNotExists flag to specify behavior if the database 
does not exist:
+        *                          if set to false, throw an exception,
+        *                          if set to true, nothing happens.
+        *
+        * @throws TableNotExistException thrown if the target table does not 
exist
+        * @throws TableNotPartitionedException thrown if the target table is 
not partitioned
+        * @throws PartitionSpecInvalidException thrown if the given partition 
spec is invalid
+        * @throws PartitionNotExistException thrown if the target partition 
does not exist
+        * @throws CatalogException in case of any runtime exception
+        */
+       void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, boolean ignoreIfNotExists)
+               throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionNotExistException, CatalogException;
+
+       /**
+        * Alter a partition.
+        *
+        * @param tablePath     path of the table
+        * @param partitionSpec partition spec of the partition
+        * @param newPartition new partition to replace the old one
+        * @param ignoreIfNotExists flag to specify behavior if the database 
does not exist:
+        *                          if set to false, throw an exception,
+        *                          if set to true, nothing happens.
+        *
+        * @throws TableNotExistException thrown if the target table does not 
exist
+        * @throws TableNotPartitionedException thrown if the target table is 
not partitioned
+        * @throws PartitionSpecInvalidException thrown if the given partition 
spec is invalid
+        * @throws PartitionNotExistException thrown if the target partition 
does not exist
+        * @throws CatalogException in case of any runtime exception
+        */
+       void alterPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
+               throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionNotExistException, CatalogException;
+
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionAlreadyExistsException.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionAlreadyExistsException.java
new file mode 100644
index 0000000..dd1e5e9
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionAlreadyExistsException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Exception for trying to create a partition that already exists.
+ */
+public class PartitionAlreadyExistsException extends Exception {
+       private static final String MSG = "Partition %s of table %s in catalog 
%s already exists.";
+
+       public PartitionAlreadyExistsException(
+               String catalogName,
+               ObjectPath tablePath,
+               CatalogPartitionSpec partitionSpec) {
+
+               super(String.format(MSG, partitionSpec, 
tablePath.getFullName(), catalogName));
+       }
+
+       public PartitionAlreadyExistsException(
+               String catalogName,
+               ObjectPath tablePath,
+               CatalogPartitionSpec partitionSpec,
+               Throwable cause) {
+
+               super(String.format(MSG, partitionSpec, 
tablePath.getFullName(), catalogName), cause);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionNotExistException.java
similarity index 50%
copy from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
copy to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionNotExistException.java
index 3a25d8f..597f85d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionNotExistException.java
@@ -16,17 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog;
+package org.apache.flink.table.catalog.exceptions;
 
-import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
 
 /**
- * Represents a table in a catalog.
+ * Exception for operation on a nonexistent partition.
  */
-public interface CatalogTable extends CatalogBaseTable {
-       /**
-        * Get the statistics of the table.
-        * @return table statistics
-        */
-       TableStats getStatistics();
+public class PartitionNotExistException extends Exception {
+       private static final String MSG = "Partition %s of table %s in catalog 
%s does not exist.";
+
+       public PartitionNotExistException(
+               String catalogName,
+               ObjectPath tablePath,
+               CatalogPartitionSpec partitionSpec) {
+
+               super(String.format(MSG, partitionSpec, 
tablePath.getFullName(), catalogName), null);
+       }
+
+       public PartitionNotExistException(
+               String catalogName,
+               ObjectPath tablePath,
+               CatalogPartitionSpec partitionSpec,
+               Throwable cause) {
+
+               super(String.format(MSG, partitionSpec, 
tablePath.getFullName(), catalogName), cause);
+       }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionSpecInvalidException.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionSpecInvalidException.java
new file mode 100644
index 0000000..57e8b49
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionSpecInvalidException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+
+import java.util.List;
+
+/**
+ * Exception for invalid PartitionSpec compared with partition key list of a 
partitioned Table.
+ * For example, it is thrown when the size of PartitionSpec exceeds the size 
of partition key list, or
+ * when the size of PartitionSpec is 'n' but its keys don't match the first 
'n' keys in partition key list.
+ */
+public class PartitionSpecInvalidException extends Exception {
+       private static final String MSG = "PartitionSpec %s does not match 
partition keys %s of table %s in catalog %s.";
+
+       public PartitionSpecInvalidException(
+               String catalogName,
+               List<String> partitionKeys,
+               ObjectPath tablePath,
+               CatalogPartitionSpec partitionSpec) {
+
+               super(String.format(MSG, partitionSpec, partitionKeys, 
tablePath.getFullName(), catalogName), null);
+       }
+
+       public PartitionSpecInvalidException(
+               String catalogName,
+               List<String> partitionKeys,
+               ObjectPath tablePath,
+               CatalogPartitionSpec partitionSpec,
+               Throwable cause) {
+
+               super(String.format(MSG, partitionSpec, partitionKeys, 
tablePath.getFullName(), catalogName), cause);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/TableNotPartitionedException.java
similarity index 57%
copy from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
copy to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/TableNotPartitionedException.java
index 3a25d8f..7bdc30f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/TableNotPartitionedException.java
@@ -16,17 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.catalog;
+package org.apache.flink.table.catalog.exceptions;
 
-import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.catalog.ObjectPath;
 
 /**
- * Represents a table in a catalog.
+ * Exception for trying to operate partition on a non-partitioned table.
  */
-public interface CatalogTable extends CatalogBaseTable {
-       /**
-        * Get the statistics of the table.
-        * @return table statistics
-        */
-       TableStats getStatistics();
+public class TableNotPartitionedException extends Exception {
+
+       private static final String MSG = "Table %s in catalog %s is not 
partitioned.";
+
+       public TableNotPartitionedException(String catalogName, ObjectPath 
tablePath) {
+               this(catalogName, tablePath, null);
+       }
+
+       public TableNotPartitionedException(String catalogName, ObjectPath 
tablePath, Throwable cause) {
+               super(String.format(MSG, tablePath.getFullName(), catalogName), 
cause);
+       }
 }

Reply via email to