This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6cbc9bf [FLINK-11518] [table] Add partition related catalog APIs and
implement them in GenericInMemoryCatalog
6cbc9bf is described below
commit 6cbc9bf2cd6142c34074deaec56bb339d30b3b93
Author: Bowen L <[email protected]>
AuthorDate: Thu Apr 25 04:48:31 2019 -0700
[FLINK-11518] [table] Add partition related catalog APIs and implement them
in GenericInMemoryCatalog
This closes #8222
---
.../table/catalog/GenericCatalogPartition.java | 61 +++
.../flink/table/catalog/GenericCatalogTable.java | 45 ++-
.../table/catalog/GenericInMemoryCatalog.java | 157 ++++++++
.../flink/table/catalog/CatalogTestUtil.java | 23 +-
.../table/catalog/GenericInMemoryCatalogTest.java | 446 +++++++++++++++++++++
.../{CatalogTable.java => CatalogPartition.java} | 36 +-
.../flink/table/catalog/CatalogPartitionSpec.java | 75 ++++
.../apache/flink/table/catalog/CatalogTable.java | 16 +
.../flink/table/catalog/ReadableCatalog.java | 59 +++
.../table/catalog/ReadableWritableCatalog.java | 76 +++-
.../PartitionAlreadyExistsException.java | 46 +++
.../PartitionNotExistException.java} | 32 +-
.../exceptions/PartitionSpecInvalidException.java | 52 +++
.../TableNotPartitionedException.java} | 23 +-
14 files changed, 1100 insertions(+), 47 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
new file mode 100644
index 0000000..085278e
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogPartition.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A generic catalog partition implementation.
+ */
+public class GenericCatalogPartition implements CatalogPartition {
+ private final Map<String, String> properties;
+
+ private String comment = "This is a generic catalog partition";
+
+ public GenericCatalogPartition(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ public GenericCatalogPartition(Map<String, String> properties, String
comment) {
+ this(properties);
+ this.comment = comment;
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public CatalogPartition copy() {
+ return new GenericCatalogPartition(new HashMap<>(properties));
+ }
+
+ @Override
+ public Optional<String> getDescription() {
+ return Optional.of(comment);
+ }
+
+ @Override
+ public Optional<String> getDetailedDescription() {
+ return Optional.of("This is a generic catalog partition with
detailed description");
+ }
+}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
index 4b1be8f..47498e7 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
@@ -21,7 +21,9 @@ package org.apache.flink.table.catalog;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.plan.stats.TableStats;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -33,23 +35,34 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
public class GenericCatalogTable implements CatalogTable {
// Schema of the table (column names and types)
private final TableSchema tableSchema;
- // Properties of the table
- private final Map<String, String> properties;
// Statistics of the table
private final TableStats tableStats;
+ // Partition keys if this is a partitioned table. It's an empty set if
the table is not partitioned
+ private final List<String> partitionKeys;
+ // Properties of the table
+ private final Map<String, String> properties;
// Comment of the table
private String comment = "This is a generic catalog table.";
- public GenericCatalogTable(TableSchema tableSchema, TableStats
tableStats, Map<String, String> properties) {
- this.tableSchema = tableSchema;
- this.tableStats = tableStats;
+ public GenericCatalogTable(
+ TableSchema tableSchema,
+ TableStats tableStats,
+ List<String> partitionKeys,
+ Map<String, String> properties,
+ String comment) {
+ this.tableSchema = checkNotNull(tableSchema, "tableSchema
cannot be null");
+ this.tableStats = checkNotNull(tableStats, "tableStats cannot
be null");
+ this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys
cannot be null");
this.properties = checkNotNull(properties, "properties cannot
be null");
+ this.comment = comment;
}
- public GenericCatalogTable(TableSchema tableSchema, TableStats
tableStats, Map<String, String> properties,
- String comment) {
- this(tableSchema, tableStats, properties);
- this.comment = comment;
+ public GenericCatalogTable(
+ TableSchema tableSchema,
+ TableStats tableStats,
+ Map<String, String> properties,
+ String comment) {
+ this(tableSchema, tableStats, new ArrayList<>(), properties,
comment);
}
@Override
@@ -58,6 +71,16 @@ public class GenericCatalogTable implements CatalogTable {
}
@Override
+ public boolean isPartitioned() {
+ return !partitionKeys.isEmpty();
+ }
+
+ @Override
+ public List<String> getPartitionKeys() {
+ return partitionKeys;
+ }
+
+ @Override
public Map<String, String> getProperties() {
return properties;
}
@@ -69,8 +92,8 @@ public class GenericCatalogTable implements CatalogTable {
@Override
public GenericCatalogTable copy() {
- return new GenericCatalogTable(this.tableSchema.copy(),
this.tableStats.copy(),
- new HashMap<>(this.properties), comment);
+ return new GenericCatalogTable(
+ this.tableSchema.copy(), this.tableStats.copy(), new
ArrayList<>(partitionKeys), new HashMap<>(this.properties), comment);
}
@Override
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index dcdfd9a..6a4e554 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -18,11 +18,16 @@
package org.apache.flink.table.catalog;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.util.StringUtils;
import java.util.ArrayList;
@@ -46,6 +51,7 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
private final String catalogName;
private final Map<String, CatalogDatabase> databases;
private final Map<ObjectPath, CatalogBaseTable> tables;
+ private final Map<ObjectPath, Map<CatalogPartitionSpec,
CatalogPartition>> partitions;
public GenericInMemoryCatalog(String name) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(name), "name
cannot be null or empty");
@@ -54,6 +60,7 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
this.databases = new LinkedHashMap<>();
this.databases.put(DEFAULT_DB, new GenericCatalogDatabase(new
HashMap<>()));
this.tables = new LinkedHashMap<>();
+ this.partitions = new LinkedHashMap<>();
}
@Override
@@ -178,6 +185,10 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
}
} else {
tables.put(tablePath, table.copy());
+
+ if ((table instanceof CatalogTable) && ((CatalogTable)
table).isPartitioned()) {
+ partitions.put(tablePath, new
LinkedHashMap<>());
+ }
}
}
@@ -187,6 +198,10 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
checkArgument(tablePath != null);
checkArgument(newTable != null);
+ // TODO: validate the new and old CatalogBaseTable must be of
the same type. For example, this doesn't
+ // allow alter a regular table to partitioned
table, or alter a view to a table, and vice versa.
+ // And also add unit tests.
+
if (tableExists(tablePath)) {
tables.put(tablePath, newTable.copy());
} else if (!ignoreIfNotExists) {
@@ -202,6 +217,8 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
if (tableExists(tablePath)) {
tables.remove(tablePath);
+
+ partitions.remove(tablePath);
} else if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName,
tablePath);
}
@@ -220,6 +237,10 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
throw new
TableAlreadyExistException(catalogName, newPath);
} else {
tables.put(newPath, tables.remove(tablePath));
+
+ if (partitions.containsKey(tablePath)) {
+ partitions.put(newPath,
partitions.remove(tablePath));
+ }
}
} else if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName,
tablePath);
@@ -269,4 +290,140 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
return tablePath != null &&
databaseExists(tablePath.getDatabaseName()) && tables.containsKey(tablePath);
}
+ // ------ partitions ------
+
+ @Override
+ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+ throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionAlreadyExistsException,
CatalogException {
+
+ validatePartitionSpec(tablePath, partitionSpec);
+
+ if (partitionExists(tablePath, partitionSpec)) {
+ if (!ignoreIfExists) {
+ throw new
PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
+ }
+ } else {
+ partitions.get(tablePath).put(partitionSpec,
partition.copy());
+ }
+ }
+
+ @Override
+ public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, boolean ignoreIfNotExists)
+ throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
+
+ validatePartitionSpec(tablePath, partitionSpec);
+
+ if (partitionExists(tablePath, partitionSpec)) {
+ partitions.get(tablePath).remove(partitionSpec);
+ } else if (!ignoreIfNotExists) {
+ throw new PartitionNotExistException(catalogName,
tablePath, partitionSpec);
+ }
+ }
+
+ @Override
+ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
+ throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
+
+ validatePartitionSpec(tablePath, partitionSpec);
+
+ if (partitionExists(tablePath, partitionSpec)) {
+ partitions.get(tablePath).put(partitionSpec,
newPartition.copy());
+ } else if (!ignoreIfNotExists) {
+ throw new PartitionNotExistException(catalogName,
tablePath, partitionSpec);
+ }
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+ throws TableNotExistException, TableNotPartitionedException,
CatalogException {
+
+ validatePartitionedTable(tablePath);
+
+ return new ArrayList<>(partitions.get(tablePath).keySet());
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, CatalogException {
+
+ validatePartitionSpec(tablePath, partitionSpec);
+
+ return partitions.get(tablePath).keySet().stream()
+ .filter(ps ->
ps.getPartitionSpec().entrySet().containsAll(partitionSpec.getPartitionSpec().entrySet()))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionNotExistException, CatalogException {
+
+ CatalogTable table = validatePartitionSpec(tablePath,
partitionSpec);
+
+ if (partitionSpec.getPartitionSpec().size() <
table.getPartitionKeys().size()) {
+ throw new PartitionSpecInvalidException(catalogName,
table.getPartitionKeys(), tablePath, partitionSpec);
+ }
+
+ if (partitionExists(tablePath, partitionSpec)) {
+ return
partitions.get(tablePath).get(partitionSpec).copy();
+ } else {
+ throw new PartitionNotExistException(catalogName,
tablePath, partitionSpec);
+ }
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+
+ return partitions.containsKey(tablePath) &&
partitions.get(tablePath).containsKey(partitionSpec);
+ }
+
+ /**
+ * Validate the partitioned table and partitionSpec.
+ */
+ private CatalogTable validatePartitionSpec(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException {
+
+ CatalogTable table = validatePartitionedTable(tablePath);
+
+ List<String> partitionKeys = table.getPartitionKeys();
+ Map<String, String> spec = partitionSpec.getPartitionSpec();
+
+ // The size of partition spec should not exceed the size of
partition keys
+ if (partitionKeys.size() < spec.size()) {
+ throw new PartitionSpecInvalidException(catalogName,
partitionKeys, tablePath, partitionSpec);
+ } else {
+ int size = spec.size();
+
+ // PartitionSpec should contain the first 'size' number
of keys in partition key list
+ for (int i = 0; i < size; i++) {
+ if (!spec.containsKey(partitionKeys.get(i))) {
+ throw new
PartitionSpecInvalidException(catalogName, partitionKeys, tablePath,
partitionSpec);
+ }
+ }
+ }
+
+ return table;
+ }
+
+ /**
+ * Validate the partitioned table.
+ */
+ private CatalogTable validatePartitionedTable(ObjectPath tablePath)
+ throws TableNotExistException, TableNotPartitionedException {
+
+ CatalogBaseTable baseTable = getTable(tablePath);
+
+ if (!(baseTable instanceof CatalogTable)) {
+ throw new CatalogException(
+ String.format("%s in Catalog %s is not a
CatalogTable", tablePath.getFullName(), catalogName));
+ }
+
+ CatalogTable table = (CatalogTable) baseTable;
+
+ if (!table.isPartitioned()) {
+ throw new TableNotPartitionedException(catalogName,
tablePath);
+ }
+
+ return table;
+ }
}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 69ccb63..9974272 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.plan.stats.TableStats;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -36,7 +37,7 @@ public class CatalogTestUtil {
public static GenericCatalogTable createTable(String comment) {
TableSchema tableSchema =
TableSchema.fromTypeInfo(getRowTypeInfo());
- return createTable(tableSchema, createTableStats(), new
HashMap<String, String>(), comment);
+ return new GenericCatalogTable(tableSchema, createTableStats(),
new HashMap<>(), comment);
}
public static RowTypeInfo getRowTypeInfo() {
@@ -49,14 +50,21 @@ public class CatalogTestUtil {
return new TableStats(2);
}
- public static GenericCatalogTable createTable(TableSchema schema,
Map<String, String> tableProperties,
+ public static GenericCatalogTable createTable(
+ TableSchema schema,
+ Map<String, String> tableProperties,
String comment) {
- return createTable(schema, new TableStats(0), tableProperties,
comment);
+
+ return new GenericCatalogTable(schema, new TableStats(0),
tableProperties, comment);
}
- public static GenericCatalogTable createTable(TableSchema schema,
TableStats stats,
- Map<String, String> tableProperties, String comment) {
- return new GenericCatalogTable(schema, stats, tableProperties,
comment);
+ public static GenericCatalogTable createPartitionedTable(
+ TableSchema schema,
+ List<String> partitionKeys,
+ Map<String, String> tableProperties,
+ String comment) {
+
+ return new GenericCatalogTable(schema, new TableStats(0),
partitionKeys, tableProperties, comment);
}
public static void checkEquals(GenericCatalogTable t1,
GenericCatalogTable t2) {
@@ -79,4 +87,7 @@ public class CatalogTestUtil {
assertEquals(d1.getProperties(), d2.getProperties());
}
+ protected static void checkEquals(CatalogPartition p1, CatalogPartition
p2) {
+ assertEquals(p1.getProperties(), p2.getProperties());
+ }
}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index f679238..ad9fd92 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -24,8 +24,12 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.junit.After;
import org.junit.Before;
@@ -42,6 +46,7 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@@ -115,10 +120,12 @@ public class GenericInMemoryCatalogTest {
CatalogDatabase database = catalog.getDatabase(db1);
assertTrue(TEST_COMMENT.equals(database.getDescription().get()));
+ // Non-partitioned table
GenericCatalogTable table = createTable();
catalog.createTable(path1, table, false);
CatalogBaseTable tableCreated = catalog.getTable(path1);
+
CatalogTestUtil.checkEquals(table, (GenericCatalogTable)
tableCreated);
assertEquals(TABLE_COMMENT,
tableCreated.getDescription().get());
@@ -128,6 +135,17 @@ public class GenericInMemoryCatalogTest {
assertEquals(path1.getObjectName(), tables.get(0));
catalog.dropTable(path1, false);
+
+ // Partitioned table
+ table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogTestUtil.checkEquals(table, (GenericCatalogTable)
catalog.getTable(path1));
+
+ tables = catalog.listTables(db1);
+
+ assertEquals(1, tables.size());
+ assertEquals(path1.getObjectName(), tables.get(0));
}
@Test
@@ -191,6 +209,19 @@ public class GenericInMemoryCatalogTest {
catalog.dropTable(path1, false);
assertFalse(catalog.tableExists(path1));
+
+ // Partitioned table
+ catalog.createTable(path1, createPartitionedTable(), false);
+ CatalogPartition catalogPartition = createPartition();
+ CatalogPartitionSpec catalogPartitionSpec =
createPartitionSpec();
+ catalog.createPartition(path1, catalogPartitionSpec,
catalogPartition, false);
+
+ assertTrue(catalog.tableExists(path1));
+
+ catalog.dropTable(path1, false);
+
+ assertFalse(catalog.tableExists(path1));
+ assertFalse(catalog.partitionExists(path1,
catalogPartitionSpec));
}
@Test
@@ -240,6 +271,17 @@ public class GenericInMemoryCatalogTest {
CatalogTestUtil.checkEquals(newTable, (GenericCatalogTable)
catalog.getTable(path1));
catalog.dropTable(path1, false);
+
+ // Partitioned table
+ table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogTestUtil.checkEquals(table, (GenericCatalogTable)
catalog.getTable(path1));
+
+ newTable = createAnotherPartitionedTable();
+ catalog.alterTable(path1, newTable, false);
+
+ CatalogTestUtil.checkEquals(newTable, (GenericCatalogTable)
catalog.getTable(path1));
}
@Test
@@ -260,6 +302,8 @@ public class GenericInMemoryCatalogTest {
@Test
public void testRenameTable() throws Exception {
catalog.createDatabase(db1, createDb(), false);
+
+ // Non-partitioned table
GenericCatalogTable table = createTable();
catalog.createTable(path1, table, false);
@@ -269,6 +313,25 @@ public class GenericInMemoryCatalogTest {
CatalogTestUtil.checkEquals(table, (GenericCatalogTable)
catalog.getTable(path3));
assertFalse(catalog.tableExists(path1));
+
+ catalog.dropTable(path3, false);
+
+ // Partitioned table
+ table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+ CatalogPartition catalogPartition = createPartition();
+ CatalogPartitionSpec catalogPartitionSpec =
createPartitionSpec();
+ catalog.createPartition(path1, catalogPartitionSpec,
catalogPartition, false);
+
+ CatalogTestUtil.checkEquals(table, (GenericCatalogTable)
catalog.getTable(path1));
+ assertTrue(catalog.partitionExists(path1,
catalogPartitionSpec));
+
+ catalog.renameTable(path1, t2, false);
+
+ CatalogTestUtil.checkEquals(table, (GenericCatalogTable)
catalog.getTable(path3));
+ assertTrue(catalog.partitionExists(path3,
catalogPartitionSpec));
+ assertFalse(catalog.tableExists(path1));
+ assertFalse(catalog.partitionExists(path1,
catalogPartitionSpec));
}
@Test
@@ -562,6 +625,327 @@ public class GenericInMemoryCatalogTest {
catalog.dropTable(viewPath2, false);
}
+ // ------ partitions ------
+
+ @Test
+ public void testCreatePartition() throws Exception {
+ CatalogTable table = createPartitionedTable();
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, table, false);
+
+ assertTrue(catalog.listPartitions(path1).isEmpty());
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ catalog.createPartition(path1, partitionSpec,
createPartition(), false);
+
+ assertEquals(Arrays.asList(partitionSpec),
catalog.listPartitions(path1));
+ assertEquals(Arrays.asList(partitionSpec),
catalog.listPartitions(path1, createPartitionSpecSubset()));
+ CatalogTestUtil.checkEquals(createPartition(),
catalog.getPartition(path1, createPartitionSpec()));
+
+ CatalogPartitionSpec anotherPartitionSpec =
createAnotherPartitionSpec();
+ CatalogPartition anotherPartition = createAnotherPartition();
+ catalog.createPartition(path1, anotherPartitionSpec,
anotherPartition, false);
+
+ assertEquals(Arrays.asList(partitionSpec,
anotherPartitionSpec), catalog.listPartitions(path1));
+ assertEquals(Arrays.asList(partitionSpec,
anotherPartitionSpec), catalog.listPartitions(path1,
createPartitionSpecSubset()));
+ CatalogTestUtil.checkEquals(anotherPartition,
catalog.getPartition(path1, anotherPartitionSpec));
+
+ CatalogPartitionSpec invalid =
createInvalidPartitionSpecSubset();
+ exception.expect(PartitionSpecInvalidException.class);
+ exception.expectMessage(
+ String.format("PartitionSpec %s does not match
partition keys %s of table %s in catalog %s",
+ invalid, table.getPartitionKeys(),
path1.getFullName(), testCatalogName));
+ catalog.listPartitions(path1, invalid);
+ }
+
+ @Test
+ public void testCreatePartition_TableNotExistException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ exception.expect(TableNotExistException.class);
+ exception.expectMessage(
+ String.format("Table (or view) %s does not exist in
Catalog %s.", path1.getFullName(), testCatalogName));
+ catalog.createPartition(path1, createPartitionSpec(),
createPartition(), false);
+ }
+
+ @Test
+ public void testCreatePartition_TableNotPartitionedException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createTable(), false);
+
+ exception.expect(TableNotPartitionedException.class);
+ exception.expectMessage(
+ String.format("Table %s in catalog %s is not
partitioned.", path1.getFullName(), testCatalogName));
+ catalog.createPartition(path1, createPartitionSpec(),
createPartition(), false);
+ }
+
+ @Test
+ public void testCreatePartition_PartitionSpecInvalidException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogPartitionSpec partitionSpec =
createInvalidPartitionSpecSubset();
+ exception.expect(PartitionSpecInvalidException.class);
+ exception.expectMessage(
+ String.format("PartitionSpec %s does not match
partition keys %s of table %s in catalog %s.",
+ partitionSpec, table.getPartitionKeys(),
path1.getFullName(), testCatalogName));
+ catalog.createPartition(path1, partitionSpec,
createPartition(), false);
+ }
+
+ @Test
+ public void testCreatePartition_PartitionAlreadyExistsException()
throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+ CatalogPartition partition = createPartition();
+ catalog.createPartition(path1, createPartitionSpec(),
partition, false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+
+ exception.expect(PartitionAlreadyExistsException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s
already exists.",
+ partitionSpec, path1.getFullName(),
testCatalogName));
+ catalog.createPartition(path1, partitionSpec,
createPartition(), false);
+ }
+
+ @Test
+ public void testCreatePartition_PartitionAlreadyExists_ignored() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ catalog.createPartition(path1, partitionSpec,
createPartition(), false);
+ catalog.createPartition(path1, partitionSpec,
createPartition(), true);
+ }
+
+ @Test
+ public void testDropPartition() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+ catalog.createPartition(path1, createPartitionSpec(),
createPartition(), false);
+
+ assertEquals(Arrays.asList(createPartitionSpec()),
catalog.listPartitions(path1));
+
+ catalog.dropPartition(path1, createPartitionSpec(), false);
+
+ assertEquals(Arrays.asList(), catalog.listPartitions(path1));
+ }
+
+ @Test
+ public void testDropPartition_TableNotExistException() throws Exception
{
+ catalog.createDatabase(db1, createDb(), false);
+
+ exception.expect(TableNotExistException.class);
+ exception.expectMessage(
+ String.format("Table (or view) %s does not exist in
Catalog %s.", path1.getFullName(), testCatalogName));
+ catalog.dropPartition(path1, createPartitionSpec(), false);
+ }
+
+ @Test
+ public void testDropPartition_TableNotPartitionedException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createTable(), false);
+
+ exception.expect(TableNotPartitionedException.class);
+ exception.expectMessage(
+ String.format("Table %s in catalog %s is not
partitioned.", path1.getFullName(), testCatalogName));
+ catalog.dropPartition(path1, createPartitionSpec(), false);
+ }
+
+ @Test
+ public void testDropPartition_PartitionSpecInvalidException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogPartitionSpec partitionSpec =
createInvalidPartitionSpecSubset();
+ exception.expect(PartitionSpecInvalidException.class);
+ exception.expectMessage(
+ String.format("PartitionSpec %s does not match
partition keys %s of table %s in catalog %s.",
+ partitionSpec, table.getPartitionKeys(),
path1.getFullName(), testCatalogName));
+ catalog.dropPartition(path1, partitionSpec, false);
+ }
+
+ @Test
+ public void testDropPartition_PartitionNotExistException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s
does not exist.", partitionSpec, path1.getFullName(), testCatalogName));
+ catalog.dropPartition(path1, partitionSpec, false);
+ }
+
+ @Test
+ public void testDropPartition_PartitionNotExist_ignored() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+ catalog.dropPartition(path1, createPartitionSpec(), true);
+ }
+
+ @Test
+ public void testAlterPartition() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ catalog.createPartition(path1, partitionSpec,
createPartition(), false);
+
+ assertEquals(Arrays.asList(partitionSpec),
catalog.listPartitions(path1));
+
+ CatalogPartition cp = catalog.getPartition(path1,
createPartitionSpec());
+ CatalogTestUtil.checkEquals(createPartition(), cp);
+
+ assertNull(cp.getProperties().get("k"));
+
+ Map<String, String> partitionProperties =
getBatchTableProperties();
+ partitionProperties.put("k", "v");
+
+ CatalogPartition another = createPartition(partitionProperties);
+ catalog.alterPartition(path1, createPartitionSpec(), another,
false);
+
+ assertEquals(Arrays.asList(createPartitionSpec()),
catalog.listPartitions(path1));
+
+ cp = catalog.getPartition(path1, createPartitionSpec());
+ CatalogTestUtil.checkEquals(another, cp);
+
+ assertEquals("v", cp.getProperties().get("k"));
+ }
+
+ @Test
+ public void testAlterPartition_TableNotExistException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(TableNotExistException.class);
+ exception.expectMessage(
+ String.format("Table (or view) %s does not exist in
Catalog %s.", path1.getFullName(), testCatalogName));
+ catalog.alterPartition(path1, partitionSpec, createPartition(),
false);
+ }
+
+ @Test
+ public void testAlterPartition_TableNotPartitionedException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createTable(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(TableNotPartitionedException.class);
+ exception.expectMessage(
+ String.format("Table %s in catalog %s is not
partitioned.", path1.getFullName(), testCatalogName));
+ catalog.alterPartition(path1, partitionSpec, createPartition(),
false);
+ }
+
+ @Test
+ public void testAlterPartition_PartitionSpecInvalidException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogPartitionSpec partitionSpec =
createInvalidPartitionSpecSubset();
+ exception.expect(PartitionSpecInvalidException.class);
+ exception.expectMessage(
+ String.format("PartitionSpec %s does not match
partition keys %s of table %s in catalog %s.",
+ partitionSpec, table.getPartitionKeys(),
path1.getFullName(), testCatalogName));
+ catalog.alterPartition(path1, partitionSpec, createPartition(),
false);
+ }
+
+ @Test
+ public void testAlterPartition_PartitionNotExistException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+
+ CatalogPartition catalogPartition = createPartition();
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s
does not exist.",
+ partitionSpec, path1.getFullName(),
testCatalogName));
+ catalog.alterPartition(path1, partitionSpec, catalogPartition,
false);
+ }
+
+ @Test
+ public void testAlterPartition_PartitionNotExist_ignored() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+ catalog.alterPartition(path1, createPartitionSpec(),
createPartition(), true);
+ }
+
+ @Test
+ public void testGetPartition_TableNotExistException() throws Exception {
+ exception.expect(TableNotExistException.class);
+ catalog.getPartition(path1, createPartitionSpec());
+ }
+
+ @Test
+ public void testGetPartition_TableNotPartitionedException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createTable(), false);
+
+ exception.expect(TableNotPartitionedException.class);
+ exception.expectMessage(
+ String.format("Table %s in catalog %s is not
partitioned.", path1.getFullName(), testCatalogName));
+ catalog.getPartition(path1, createPartitionSpec());
+ }
+
+ @Test
+ public void
testGetPartition_PartitionSpecInvalidException_invalidPartitionSpec() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogPartitionSpec partitionSpec =
createInvalidPartitionSpecSubset();
+ exception.expect(PartitionSpecInvalidException.class);
+ exception.expectMessage(
+ String.format("PartitionSpec %s does not match
partition keys %s of table %s in catalog %s.",
+ partitionSpec, table.getPartitionKeys(),
path1.getFullName(), testCatalogName));
+ catalog.getPartition(path1, partitionSpec);
+ }
+
+ @Test
+ public void
testGetPartition_PartitionSpecInvalidException_sizeNotEqual() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ CatalogTable table = createPartitionedTable();
+ catalog.createTable(path1, table, false);
+
+ CatalogPartitionSpec partitionSpec = new CatalogPartitionSpec(
+ new HashMap<String, String>() {{
+ put("second", "bob");
+ }}
+ );
+ exception.expect(PartitionSpecInvalidException.class);
+ exception.expectMessage(
+ String.format("PartitionSpec %s does not match
partition keys %s of table %s in catalog %s.",
+ partitionSpec, table.getPartitionKeys(),
path1.getFullName(), testCatalogName));
+ catalog.getPartition(path1, partitionSpec);
+ }
+
+ @Test
+ public void testGetPartition_PartitionNotExistException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+
+ CatalogPartitionSpec partitionSpec = createPartitionSpec();
+ exception.expect(PartitionNotExistException.class);
+ exception.expectMessage(
+ String.format("Partition %s of table %s in catalog %s
does not exist.",
+ partitionSpec, path1.getFullName(),
testCatalogName));
+ catalog.getPartition(path1, partitionSpec);
+ }
+
+ @Test
+ public void testPartitionExists() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createTable(path1, createPartitionedTable(), false);
+ catalog.createPartition(path1, createPartitionSpec(),
createPartition(), false);
+
+ assertTrue(catalog.partitionExists(path1,
createPartitionSpec()));
+ assertFalse(catalog.partitionExists(path2,
createPartitionSpec()));
+
assertFalse(catalog.partitionExists(ObjectPath.fromString("non.exist"),
createPartitionSpec()));
+ }
+
// ------ utilities ------
private GenericCatalogTable createStreamingTable() {
@@ -582,6 +966,68 @@ public class GenericInMemoryCatalogTest {
getBatchTableProperties(), TABLE_COMMENT);
}
+ protected GenericCatalogTable createPartitionedTable() {
+ return CatalogTestUtil.createPartitionedTable(
+ createTableSchema(),
+ createPartitionKeys(),
+ getBatchTableProperties(),
+ TABLE_COMMENT);
+ }
+
+ protected GenericCatalogTable createAnotherPartitionedTable() {
+ return CatalogTestUtil.createPartitionedTable(
+ createAnotherTableSchema(),
+ createPartitionKeys(),
+ getBatchTableProperties(),
+ TABLE_COMMENT);
+ }
+
+ private List<String> createPartitionKeys() {
+ return Arrays.asList("second", "third");
+ }
+
+ private CatalogPartitionSpec createPartitionSpec() {
+ return new CatalogPartitionSpec(
+ new HashMap<String, String>() {{
+ put("third", "2000");
+ put("second", "bob");
+ }});
+ }
+
+ private CatalogPartitionSpec createAnotherPartitionSpec() {
+ return new CatalogPartitionSpec(
+ new HashMap<String, String>() {{
+ put("third", "2010");
+ put("second", "bob");
+ }});
+ }
+
+ private CatalogPartitionSpec createPartitionSpecSubset() {
+ return new CatalogPartitionSpec(
+ new HashMap<String, String>() {{
+ put("second", "bob");
+ }});
+ }
+
+ private CatalogPartitionSpec createInvalidPartitionSpecSubset() {
+ return new CatalogPartitionSpec(
+ new HashMap<String, String>() {{
+ put("third", "2010");
+ }});
+ }
+
+ private CatalogPartition createPartition() {
+ return new GenericCatalogPartition(getBatchTableProperties());
+ }
+
+ private CatalogPartition createAnotherPartition() {
+ return new GenericCatalogPartition(getBatchTableProperties());
+ }
+
+ private CatalogPartition createPartition(Map<String, String> props) {
+ return new GenericCatalogPartition(props);
+ }
+
private CatalogDatabase createDb() {
return new GenericCatalogDatabase(new HashMap<String, String>()
{{
put("k1", "v1");
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
similarity index 53%
copy from
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
copy to
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
index 3a25d8f..47dce25 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartition.java
@@ -18,15 +18,39 @@
package org.apache.flink.table.catalog;
-import org.apache.flink.table.plan.stats.TableStats;
+import java.util.Map;
+import java.util.Optional;
/**
- * Represents a table in a catalog.
+ * Represents a partition object in catalog.
*/
-public interface CatalogTable extends CatalogBaseTable {
+public interface CatalogPartition {
+
+ /**
+ * Get a map of properties associated with the partition.
+ *
+ * @return a map of properties with the partition
+ */
+ Map<String, String> getProperties();
+
+ /**
+ * Get a deep copy of the CatalogPartition instance.
+ *
+ * @return a copy of CatalogPartition instance
+ */
+ CatalogPartition copy();
+
+ /**
+ * Get a brief description of the database.
+ *
+ * @return an optional short description of the database
+ */
+ Optional<String> getDescription();
+
/**
- * Get the statistics of the table.
- * @return table statistics
+ * Get a detailed description of the database.
+ *
+ * @return an optional long description of the database
*/
- TableStats getStatistics();
+ Optional<String> getDetailedDescription();
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartitionSpec.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartitionSpec.java
new file mode 100644
index 0000000..a2d4cee
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartitionSpec.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Represents a partition spec object in catalog.
+ * Partition columns and values are NOT of strict order, and they need to be
re-arranged to the correct order
+ * by comparing with a list of strictly ordered partition keys.
+ */
+public class CatalogPartitionSpec {
+
+ // An unmodifiable map as <partition key, value>
+ private final Map<String, String> partitionSpec;
+
+ public CatalogPartitionSpec(Map<String, String> partitionSpec) {
+ checkNotNull(partitionSpec, "partitionSpec cannot be null");
+
+ this.partitionSpec = Collections.unmodifiableMap(partitionSpec);
+ }
+
+ /**
+ * Get the partition spec as key-value map.
+ *
+ * @return a map of partition spec keys and values
+ */
+ public Map<String, String> getPartitionSpec() {
+ return partitionSpec;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ CatalogPartitionSpec that = (CatalogPartitionSpec) o;
+ return partitionSpec.equals(that.partitionSpec);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partitionSpec);
+ }
+
+ @Override
+ public String toString() {
+ return "CatalogPartitionSpec{" + partitionSpec + '}';
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
index 3a25d8f..545fad9 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.catalog;
import org.apache.flink.table.plan.stats.TableStats;
+import java.util.List;
+
/**
* Represents a table in a catalog.
*/
@@ -29,4 +31,18 @@ public interface CatalogTable extends CatalogBaseTable {
* @return table statistics
*/
TableStats getStatistics();
+
+ /**
+ * Check if the table is partitioned or not.
+ *
+ * @return true if the table is partitioned; otherwise, false
+ */
+ boolean isPartitioned();
+
+ /**
+ * Get the partition keys of the table. This will be an empty set if
the table is not partitioned.
+ *
+ * @return partition keys of the table
+ */
+ List<String> getPartitionKeys();
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
index f50e2d3..f0b675d 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
@@ -20,7 +20,10 @@ package org.apache.flink.table.catalog;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import java.util.List;
@@ -132,4 +135,60 @@ public interface ReadableCatalog {
*/
boolean tableExists(ObjectPath objectPath) throws CatalogException;
+ // ------ partitions ------
+
+ /**
+ * Get CatalogPartitionSpec of all partitions of the table.
+ *
+ * @param tablePath path of the table
+ * @return a list of CatalogPartitionSpec of the table
+ *
+ * @throws TableNotExistException thrown if the table does not exist in
the catalog
+ * @throws TableNotPartitionedException thrown if the table is not
partitioned
+ * @throws CatalogException in case of any runtime exception
+ */
+ List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+ throws TableNotExistException, TableNotPartitionedException,
CatalogException;
+
+ /**
+ * Get CatalogPartitionSpec of all partitions that is under the given
CatalogPartitionSpec in the table.
+ *
+ * @param tablePath path of the table
+ * @param partitionSpec the partition spec to list
+ * @return a list of CatalogPartitionSpec that is under the given
CatalogPartitionSpec in the table
+ *
+ * @throws TableNotExistException thrown if the table does not exist in
the catalog
+ * @throws TableNotPartitionedException thrown if the table is not
partitioned
+ * @throws PartitionSpecInvalidException thrown if the given partition
spec is invalid
+ * @throws CatalogException in case of any runtime exception
+ */
+ List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, CatalogException;
+
+ /**
+ * Get a partition of the given table.
+ * The given partition spec keys and values need to be matched exactly
for a result.
+ *
+ * @param tablePath path of the table
+ * @param partitionSpec partition spec of partition to get
+ * @return the requested partition
+ *
+ * @throws TableNotExistException thrown if the table does not exist in
the catalog
+ * @throws TableNotPartitionedException thrown if the table is not
partitioned
+ * @throws PartitionSpecInvalidException thrown if the given partition
spec is invalid,
+ * @throws PartitionNotExistException thrown if the partition is not
partitioned
+ * @throws CatalogException in case of any runtime exception
+ */
+ CatalogPartition getPartition(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionNotExistException, CatalogException;
+
+ /**
+ * Check whether a partition exists or not.
+ *
+ * @param tablePath path of the table
+ * @param partitionSpec partition spec of the partition to check
+ * @throws CatalogException in case of any runtime exception
+ */
+ boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec) throws CatalogException;
+
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
index 75023cd..755ae27 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
@@ -22,8 +22,12 @@ import
org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
/**
* An interface responsible for manipulating catalog metadata.
@@ -105,9 +109,9 @@ public interface ReadableWritableCatalog extends
ReadableCatalog {
/**
* Create a new table or view.
*
- * @param tablePath Path of the table or view to be created
- * @param table The table definition
- * @param ignoreIfExists Flag to specify behavior when a table or view
already exists at the given path:
+ * @param tablePath path of the table or view to be created
+ * @param table the table definition
+ * @param ignoreIfExists flag to specify behavior when a table or view
already exists at the given path:
* if set to false, it throws a
TableAlreadyExistException,
* if set to true, do nothing.
* @throws TableAlreadyExistException if table already exists and
ignoreIfExists is false
@@ -119,10 +123,12 @@ public interface ReadableWritableCatalog extends
ReadableCatalog {
/**
* Modify an existing table or view.
+ * Note that the new and old CatalogBaseTable must be of the same type.
For example, this doesn't
+ * allow alter a regular table to partitioned table, or alter a view to
a table, and vice versa.
*
- * @param tableName Path of the table or view to be modified
- * @param newTable The new table definition
- * @param ignoreIfNotExists Flag to specify behavior when the table or
view does not exist:
+ * @param tableName path of the table or view to be modified
+ * @param newTable the new table definition
+ * @param ignoreIfNotExists flag to specify behavior when the table or
view does not exist:
* if set to false, throw an exception,
* if set to true, do nothing.
* @throws TableNotExistException if the table does not exist
@@ -131,4 +137,62 @@ public interface ReadableWritableCatalog extends
ReadableCatalog {
void alterTable(ObjectPath tableName, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException;
+ // ------ partitions ------
+
+ /**
+ * Create a partition.
+ *
+ * @param tablePath path of the table.
+ * @param partitionSpec partition spec of the partition
+ * @param partition the partition to add.
+ * @param ignoreIfExists flag to specify behavior if a table with the
given name already exists:
+ * if set to false, it throws a
TableAlreadyExistException,
+ * if set to true, nothing happens.
+ *
+ * @throws TableNotExistException thrown if the target table does not
exist
+ * @throws TableNotPartitionedException thrown if the target table is
not partitioned
+ * @throws PartitionSpecInvalidException thrown if the given partition
spec is invalid
+ * @throws PartitionAlreadyExistsException thrown if the target
partition already exists
+ * @throws CatalogException in case of any runtime exception
+ */
+ void createPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+ throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionAlreadyExistsException,
CatalogException;
+
+ /**
+ * Drop a partition.
+ *
+ * @param tablePath path of the table.
+ * @param partitionSpec partition spec of the partition to drop
+ * @param ignoreIfNotExists flag to specify behavior if the database
does not exist:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ *
+ * @throws TableNotExistException thrown if the target table does not
exist
+ * @throws TableNotPartitionedException thrown if the target table is
not partitioned
+ * @throws PartitionSpecInvalidException thrown if the given partition
spec is invalid
+ * @throws PartitionNotExistException thrown if the target partition
does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ void dropPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, boolean ignoreIfNotExists)
+ throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionNotExistException, CatalogException;
+
+ /**
+ * Alter a partition.
+ *
+ * @param tablePath path of the table
+ * @param partitionSpec partition spec of the partition
+ * @param newPartition new partition to replace the old one
+ * @param ignoreIfNotExists flag to specify behavior if the database
does not exist:
+ * if set to false, throw an exception,
+ * if set to true, nothing happens.
+ *
+ * @throws TableNotExistException thrown if the target table does not
exist
+ * @throws TableNotPartitionedException thrown if the target table is
not partitioned
+ * @throws PartitionSpecInvalidException thrown if the given partition
spec is invalid
+ * @throws PartitionNotExistException thrown if the target partition
does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ void alterPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
+ throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionNotExistException, CatalogException;
+
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionAlreadyExistsException.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionAlreadyExistsException.java
new file mode 100644
index 0000000..dd1e5e9
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionAlreadyExistsException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Exception for trying to create a partition that already exists.
+ */
+public class PartitionAlreadyExistsException extends Exception {
+ private static final String MSG = "Partition %s of table %s in catalog
%s already exists.";
+
+ public PartitionAlreadyExistsException(
+ String catalogName,
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec) {
+
+ super(String.format(MSG, partitionSpec,
tablePath.getFullName(), catalogName));
+ }
+
+ public PartitionAlreadyExistsException(
+ String catalogName,
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ Throwable cause) {
+
+ super(String.format(MSG, partitionSpec,
tablePath.getFullName(), catalogName), cause);
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionNotExistException.java
similarity index 50%
copy from
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
copy to
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionNotExistException.java
index 3a25d8f..597f85d 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionNotExistException.java
@@ -16,17 +16,31 @@
* limitations under the License.
*/
-package org.apache.flink.table.catalog;
+package org.apache.flink.table.catalog.exceptions;
-import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
/**
- * Represents a table in a catalog.
+ * Exception for operation on a nonexistent partition.
*/
-public interface CatalogTable extends CatalogBaseTable {
- /**
- * Get the statistics of the table.
- * @return table statistics
- */
- TableStats getStatistics();
+public class PartitionNotExistException extends Exception {
+ private static final String MSG = "Partition %s of table %s in catalog
%s does not exist.";
+
+ public PartitionNotExistException(
+ String catalogName,
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec) {
+
+ super(String.format(MSG, partitionSpec,
tablePath.getFullName(), catalogName), null);
+ }
+
+ public PartitionNotExistException(
+ String catalogName,
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ Throwable cause) {
+
+ super(String.format(MSG, partitionSpec,
tablePath.getFullName(), catalogName), cause);
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionSpecInvalidException.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionSpecInvalidException.java
new file mode 100644
index 0000000..57e8b49
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/PartitionSpecInvalidException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+
+import java.util.List;
+
+/**
+ * Exception for invalid PartitionSpec compared with partition key list of a
partitioned Table.
+ * For example, it is thrown when the size of PartitionSpec exceeds the size
of partition key list, or
+ * when the size of PartitionSpec is 'n' but its keys don't match the first
'n' keys in partition key list.
+ */
+public class PartitionSpecInvalidException extends Exception {
+ private static final String MSG = "PartitionSpec %s does not match
partition keys %s of table %s in catalog %s.";
+
+ public PartitionSpecInvalidException(
+ String catalogName,
+ List<String> partitionKeys,
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec) {
+
+ super(String.format(MSG, partitionSpec, partitionKeys,
tablePath.getFullName(), catalogName), null);
+ }
+
+ public PartitionSpecInvalidException(
+ String catalogName,
+ List<String> partitionKeys,
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ Throwable cause) {
+
+ super(String.format(MSG, partitionSpec, partitionKeys,
tablePath.getFullName(), catalogName), cause);
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/TableNotPartitionedException.java
similarity index 57%
copy from
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
copy to
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/TableNotPartitionedException.java
index 3a25d8f..7bdc30f 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/TableNotPartitionedException.java
@@ -16,17 +16,22 @@
* limitations under the License.
*/
-package org.apache.flink.table.catalog;
+package org.apache.flink.table.catalog.exceptions;
-import org.apache.flink.table.plan.stats.TableStats;
+import org.apache.flink.table.catalog.ObjectPath;
/**
- * Represents a table in a catalog.
+ * Exception for trying to operate partition on a non-partitioned table.
*/
-public interface CatalogTable extends CatalogBaseTable {
- /**
- * Get the statistics of the table.
- * @return table statistics
- */
- TableStats getStatistics();
+public class TableNotPartitionedException extends Exception {
+
+ private static final String MSG = "Table %s in catalog %s is not
partitioned.";
+
+ public TableNotPartitionedException(String catalogName, ObjectPath
tablePath) {
+ this(catalogName, tablePath, null);
+ }
+
+ public TableNotPartitionedException(String catalogName, ObjectPath
tablePath, Throwable cause) {
+ super(String.format(MSG, tablePath.getFullName(), catalogName),
cause);
+ }
}