This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new af76026 [FLINK-11519][table] Add function related catalog APIs
af76026 is described below
commit af76026dd54bb331b5df881edc29298e40d2da08
Author: xuefuz <[email protected]>
AuthorDate: Fri Apr 26 07:13:31 2019 -0700
[FLINK-11519][table] Add function related catalog APIs
This closes #8275
---
.../table/catalog/GenericCatalogFunction.java | 75 +++++++++
.../table/catalog/GenericInMemoryCatalog.java | 106 ++++++++++--
.../flink/table/catalog/CatalogTestUtil.java | 5 +
.../table/catalog/GenericInMemoryCatalogTest.java | 183 +++++++++++++++++++++
.../flink/table/catalog/CatalogFunction.java | 64 +++++++
.../flink/table/catalog/ReadableCatalog.java | 33 ++++
.../table/catalog/ReadableWritableCatalog.java | 45 +++++
.../exceptions/FunctionAlreadyExistException.java | 38 +++++
.../exceptions/FunctionNotExistException.java | 38 +++++
9 files changed, 575 insertions(+), 12 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
new file mode 100644
index 0000000..5c0176d
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A generic catalog function implementation.
+ */
+public class GenericCatalogFunction implements CatalogFunction {
+
+ private final String className; // Fully qualified class name of the
function
+ private final Map<String, String> properties;
+
+ public GenericCatalogFunction(String className) {
+ this(className, new HashMap<>());
+ }
+
+ public GenericCatalogFunction(String className, Map<String, String>
properties) {
+ this.className = className;
+ this.properties = properties;
+ }
+
+ @Override
+ public String getClassName() {
+ return this.className;
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return this.properties;
+ }
+
+ @Override
+ public GenericCatalogFunction copy() {
+ return new GenericCatalogFunction(className, new
HashMap<>(properties));
+ }
+
+ @Override
+ public Optional<String> getDescription() {
+ return Optional.of("This is a user-defined function");
+ }
+
+ @Override
+ public Optional<String> getDetailedDescription() {
+ return Optional.of("This is a user-defined function in an
in-memory catalog implementation");
+ }
+
+ @Override
+ public String toString() {
+ return "GenericCatalogFunction{" +
+ ", className='" + className + '\'' +
+ ", properties=" + properties +
+ '}';
+ }
+
+}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index 6a4e554..b9568a5 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -22,6 +22,8 @@ import
org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
@@ -38,6 +40,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A generic catalog implementation that holds all meta objects in memory.
@@ -51,6 +54,7 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
private final String catalogName;
private final Map<String, CatalogDatabase> databases;
private final Map<ObjectPath, CatalogBaseTable> tables;
+ private final Map<ObjectPath, CatalogFunction> functions;
private final Map<ObjectPath, Map<CatalogPartitionSpec,
CatalogPartition>> partitions;
public GenericInMemoryCatalog(String name) {
@@ -60,6 +64,7 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
this.databases = new LinkedHashMap<>();
this.databases.put(DEFAULT_DB, new GenericCatalogDatabase(new
HashMap<>()));
this.tables = new LinkedHashMap<>();
+ this.functions = new LinkedHashMap<>();
this.partitions = new LinkedHashMap<>();
}
@@ -95,7 +100,7 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
public void createDatabase(String databaseName, CatalogDatabase db,
boolean ignoreIfExists)
throws DatabaseAlreadyExistException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
- checkArgument(db != null);
+ checkNotNull(db);
if (databaseExists(databaseName)) {
if (!ignoreIfExists) {
@@ -127,15 +132,15 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
private boolean isDatabaseEmpty(String databaseName) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
- return tables.keySet().stream().noneMatch(op ->
op.getDatabaseName().equals(databaseName));
- // TODO: also check function when function is added.
+ return tables.keySet().stream().noneMatch(op ->
op.getDatabaseName().equals(databaseName)) &&
+ functions.keySet().stream().noneMatch(op ->
op.getDatabaseName().equals(databaseName));
}
@Override
public void alterDatabase(String databaseName, CatalogDatabase
newDatabase, boolean ignoreIfNotExists)
throws DatabaseNotExistException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
- checkArgument(newDatabase != null);
+ checkNotNull(newDatabase);
if (databaseExists(databaseName)) {
databases.put(databaseName, newDatabase.copy());
@@ -172,8 +177,8 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException {
- checkArgument(tablePath != null);
- checkArgument(table != null);
+ checkNotNull(tablePath);
+ checkNotNull(table);
if (!databaseExists(tablePath.getDatabaseName())) {
throw new DatabaseNotExistException(catalogName,
tablePath.getDatabaseName());
@@ -195,8 +200,8 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
throws TableNotExistException {
- checkArgument(tablePath != null);
- checkArgument(newTable != null);
+ checkNotNull(tablePath);
+ checkNotNull(newTable);
// TODO: validate the new and old CatalogBaseTable must be of
the same type. For example, this doesn't
// allow alter a regular table to partitioned
table, or alter a view to a table, and vice versa.
@@ -213,7 +218,7 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException {
- checkArgument(tablePath != null);
+ checkNotNull(tablePath);
if (tableExists(tablePath)) {
tables.remove(tablePath);
@@ -227,7 +232,7 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
@Override
public void renameTable(ObjectPath tablePath, String newTableName,
boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException {
- checkArgument(tablePath != null);
+ checkNotNull(tablePath);
checkArgument(!StringUtils.isNullOrWhitespaceOnly(newTableName));
if (tableExists(tablePath)) {
@@ -276,7 +281,7 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws
TableNotExistException {
- checkArgument(tablePath != null);
+ checkNotNull(tablePath);
if (!tableExists(tablePath)) {
throw new TableNotExistException(catalogName,
tablePath);
@@ -287,7 +292,84 @@ public class GenericInMemoryCatalog implements
ReadableWritableCatalog {
@Override
public boolean tableExists(ObjectPath tablePath) {
- return tablePath != null &&
databaseExists(tablePath.getDatabaseName()) && tables.containsKey(tablePath);
+ checkNotNull(tablePath);
+
+ return databaseExists(tablePath.getDatabaseName()) &&
tables.containsKey(tablePath);
+ }
+
+ // ------ functions ------
+
+ @Override
+ public void createFunction(ObjectPath functionPath, CatalogFunction
function, boolean ignoreIfExists)
+ throws FunctionAlreadyExistException,
DatabaseNotExistException {
+ checkNotNull(functionPath);
+ checkNotNull(function);
+
+ if (!databaseExists(functionPath.getDatabaseName())) {
+ throw new DatabaseNotExistException(catalogName,
functionPath.getDatabaseName());
+ }
+
+ if (functionExists(functionPath)) {
+ if (!ignoreIfExists) {
+ throw new
FunctionAlreadyExistException(catalogName, functionPath);
+ }
+ } else {
+ functions.put(functionPath, function.copy());
+ }
+ }
+
+ @Override
+ public void alterFunction(ObjectPath functionPath, CatalogFunction
newFunction, boolean ignoreIfNotExists)
+ throws FunctionNotExistException {
+ checkNotNull(functionPath);
+ checkNotNull(newFunction);
+
+ if (functionExists(functionPath)) {
+ functions.put(functionPath, newFunction.copy());
+ } else if (!ignoreIfNotExists) {
+ throw new FunctionNotExistException(catalogName,
functionPath);
+ }
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean
ignoreIfNotExists) throws FunctionNotExistException {
+ checkNotNull(functionPath);
+
+ if (functionExists(functionPath)) {
+ functions.remove(functionPath);
+ } else if (!ignoreIfNotExists) {
+ throw new FunctionNotExistException(catalogName,
functionPath);
+ }
+ }
+
+ @Override
+ public List<String> listFunctions(String databaseName) throws
DatabaseNotExistException {
+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName
cannot be null or empty");
+
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(catalogName,
databaseName);
+ }
+
+ return functions.keySet().stream()
+ .filter(k ->
k.getDatabaseName().equals(databaseName)).map(k -> k.getObjectName())
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath) throws
FunctionNotExistException {
+ checkNotNull(functionPath);
+
+ if (!functionExists(functionPath)) {
+ throw new FunctionNotExistException(catalogName,
functionPath);
+ } else {
+ return functions.get(functionPath).copy();
+ }
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) {
+ checkNotNull(functionPath);
+ return databaseExists(functionPath.getDatabaseName()) &&
functions.containsKey(functionPath);
}
// ------ partitions ------
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 9974272..33d1ea1 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -87,6 +87,11 @@ public class CatalogTestUtil {
assertEquals(d1.getProperties(), d2.getProperties());
}
+ protected static void checkEquals(CatalogFunction f1, CatalogFunction
f2) {
+ assertEquals(f1.getClassName(), f2.getClassName());
+ assertEquals(f1.getProperties(), f2.getProperties());
+ }
+
protected static void checkEquals(CatalogPartition p1, CatalogPartition
p2) {
assertEquals(p1.getProperties(), p2.getProperties());
}
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index ad9fd92..d96d8b5 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -24,12 +24,15 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.functions.ScalarFunction;
import org.junit.After;
import org.junit.Before;
@@ -94,6 +97,9 @@ public class GenericInMemoryCatalogTest {
if (catalog.tableExists(path3)) {
catalog.dropTable(path3, true);
}
+ if (catalog.functionExists(path1)) {
+ catalog.dropFunction(path1, true);
+ }
if (catalog.databaseExists(db1)) {
catalog.dropDatabase(db1, true);
}
@@ -946,6 +952,158 @@ public class GenericInMemoryCatalogTest {
assertFalse(catalog.partitionExists(ObjectPath.fromString("non.exist"),
createPartitionSpec()));
}
+ // ------ functions ------
+
+ @Test
+ public void testCreateFunction() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ assertFalse(catalog.functionExists(path1));
+
+ catalog.createFunction(path1, createFunction(), false);
+
+ assertTrue(catalog.functionExists(path1));
+
+ catalog.dropFunction(path1, false);
+ catalog.dropDatabase(db1, false);
+ }
+
+ @Test
+ public void testCreateFunction_DatabaseNotExistException() throws
Exception {
+ assertFalse(catalog.databaseExists(db1));
+
+ exception.expect(DatabaseNotExistException.class);
+ exception.expectMessage("Database db1 does not exist in
Catalog");
+ catalog.createFunction(path1, createFunction(), false);
+ }
+
+ @Test
+ public void testCreateFunction_FunctionAlreadyExistException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createFunction(path1, createFunction(), false);
+
+ exception.expect(FunctionAlreadyExistException.class);
+ exception.expectMessage("Function db1.t1 already exists in
Catalog");
+ catalog.createFunction(path1, createFunction(), false);
+ }
+
+ @Test
+ public void testCreateFunction_FunctionAlreadyExist_ignored() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ CatalogFunction func = createFunction();
+ catalog.createFunction(path1, func, false);
+
+ CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+ catalog.createFunction(path1, createAnotherFunction(), true);
+
+ CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+ catalog.dropFunction(path1, false);
+ catalog.dropDatabase(db1, false);
+ }
+
+ @Test
+ public void testAlterFunction() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ CatalogFunction func = createFunction();
+ catalog.createFunction(path1, func, false);
+
+ CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+ CatalogFunction newFunc = createAnotherFunction();
+ catalog.alterFunction(path1, newFunc, false);
+
+ assertNotEquals(func, catalog.getFunction(path1));
+ CatalogTestUtil.checkEquals(newFunc,
catalog.getFunction(path1));
+
+ catalog.dropFunction(path1, false);
+ catalog.dropDatabase(db1, false);
+ }
+
+ @Test
+ public void testAlterFunction_FunctionNotExistException() throws
Exception {
+ exception.expect(FunctionNotExistException.class);
+ exception.expectMessage("Function db1.nonexist does not exist
in Catalog");
+ catalog.alterFunction(nonExistObjectPath, createFunction(),
false);
+ }
+
+ @Test
+ public void testAlterFunction_FunctionNotExist_ignored() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.alterFunction(nonExistObjectPath, createFunction(),
true);
+
+ assertFalse(catalog.functionExists(nonExistObjectPath));
+
+ catalog.dropDatabase(db1, false);
+ }
+
+ @Test
+ public void testListFunctions() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ CatalogFunction func = createFunction();
+ catalog.createFunction(path1, func, false);
+
+ assertEquals(path1.getObjectName(),
catalog.listFunctions(db1).get(0));
+
+ catalog.dropFunction(path1, false);
+ catalog.dropDatabase(db1, false);
+ }
+
+ @Test
+ public void testListFunctions_DatabaseNotExistException() throws
Exception{
+ exception.expect(DatabaseNotExistException.class);
+ exception.expectMessage("Database db1 does not exist in
Catalog");
+ catalog.listFunctions(db1);
+ }
+
+ @Test
+ public void testGetFunction_FunctionNotExistException() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+
+ exception.expect(FunctionNotExistException.class);
+ exception.expectMessage("Function db1.nonexist does not exist
in Catalog");
+ catalog.getFunction(nonExistObjectPath);
+ }
+
+ @Test
+ public void testGetFunction_FunctionNotExistException_NoDb() throws
Exception {
+ exception.expect(FunctionNotExistException.class);
+ exception.expectMessage("Function db1.nonexist does not exist
in Catalog");
+ catalog.getFunction(nonExistObjectPath);
+ }
+
+ @Test
+ public void testDropFunction() throws Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.createFunction(path1, createFunction(), false);
+
+ assertTrue(catalog.functionExists(path1));
+
+ catalog.dropFunction(path1, false);
+
+ assertFalse(catalog.functionExists(path1));
+
+ catalog.dropDatabase(db1, false);
+ }
+
+ @Test
+ public void testDropFunction_FunctionNotExistException() throws
Exception {
+ exception.expect(FunctionNotExistException.class);
+ exception.expectMessage("Function non.exist does not exist in
Catalog");
+ catalog.dropFunction(nonExistDbPath, false);
+ }
+
+ @Test
+ public void testDropFunction_FunctionNotExist_ignored() throws
Exception {
+ catalog.createDatabase(db1, createDb(), false);
+ catalog.dropFunction(nonExistObjectPath, true);
+ catalog.dropDatabase(db1, false);
+ }
+
// ------ utilities ------
private GenericCatalogTable createStreamingTable() {
@@ -1092,4 +1250,29 @@ public class GenericInMemoryCatalogTest {
"This is another view");
}
+ protected CatalogFunction createFunction() {
+ return new
GenericCatalogFunction(MyScalarFunction.class.getName());
+ }
+
+ protected CatalogFunction createAnotherFunction() {
+ return new
GenericCatalogFunction(MyOtherScalarFunction.class.getName());
+ }
+
+ /**
+ * Test UDF.
+ */
+ public static class MyScalarFunction extends ScalarFunction {
+ public Integer eval(Integer i) {
+ return i + 1;
+ }
+ }
+
+ /**
+ * Test UDF.
+ */
+ public static class MyOtherScalarFunction extends ScalarFunction {
+ public String eval(Integer i) {
+ return String.valueOf(i);
+ }
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
new file mode 100644
index 0000000..9dbb791
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for a function in a catalog.
+ */
+public interface CatalogFunction {
+
+ /**
+ * Get the full name of the class backing the function.
+ *
+ * @return the full name of the class
+ */
+ String getClassName();
+
+ /**
+ * Get the properties of the function.
+ *
+ * @return the properties of the function
+ */
+ Map<String, String> getProperties();
+
+ /**
+ * Create a deep copy of the function.
+ *
+ * @return a deep copy of "this" instance
+ */
+ CatalogFunction copy();
+
+ /**
+ * Get a brief description of the function.
+ *
+ * @return an optional short description of the function
+ */
+ Optional<String> getDescription();
+
+ /**
+ * Get a detailed description of the function.
+ *
+ * @return an optional long description of the function
+ */
+ Optional<String> getDetailedDescription();
+
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
index f0b675d..1b08736 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
@@ -191,4 +192,36 @@ public interface ReadableCatalog {
*/
boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec) throws CatalogException;
+ // ------ functions ------
+
+ /**
+ * List the names of all functions in the given database. An empty list
is returned if none is registered.
+ *
+ * @param dbName name of the database.
+ * @return a list of the names of the functions in this database
+ * @throws DatabaseNotExistException if the database does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ List<String> listFunctions(String dbName) throws
DatabaseNotExistException, CatalogException;
+
+ /**
+ * Get the function.
+ *
+ * @param functionPath path of the function
+ * @return the requested function
+ * @throws FunctionNotExistException if the function does not exist in
the catalog
+ * @throws CatalogException in case of any runtime exception
+ */
+ CatalogFunction getFunction(ObjectPath functionPath) throws
FunctionNotExistException, CatalogException;
+
+ /**
+ * Check whether a function exists or not.
+ *
+ * @param functionPath path of the function
+ * @return true if the function exists in the catalog
+ * false otherwise
+ * @throws CatalogException in case of any runtime exception
+ */
+ boolean functionExists(ObjectPath functionPath) throws CatalogException;
+
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
index 755ae27..035b049 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
@@ -22,6 +22,8 @@ import
org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
@@ -195,4 +197,47 @@ public interface ReadableWritableCatalog extends
ReadableCatalog {
void alterPartition(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionNotExistException, CatalogException;
+ // ------ functions ------
+
+ /**
+ * Create a function.
+ *
+ * @param functionPath path of the function
+ * @param function the function to be created
+ * @param ignoreIfExists flag to specify behavior if a function with
the given name already exists:
+ * if set to false, it throws a
FunctionAlreadyExistException,
+ * if set to true, nothing happens.
+ * @throws FunctionAlreadyExistException if the function already exist
+ * @throws DatabaseNotExistException if the given database does not
exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ void createFunction(ObjectPath functionPath, CatalogFunction function,
boolean ignoreIfExists)
+ throws FunctionAlreadyExistException,
DatabaseNotExistException, CatalogException;
+
+ /**
+ * Modify an existing function.
+ *
+ * @param functionPath path of the function
+ * @param newFunction the function to be modified
+ * @param ignoreIfNotExists flag to specify behavior if the function
does not exist:
+ * if set to false, throw an exception
+ * if set to true, nothing happens
+ * @throws FunctionNotExistException if the function does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ void alterFunction(ObjectPath functionPath, CatalogFunction
newFunction, boolean ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException;
+
+ /**
+ * Drop a function.
+ *
+ * @param functionPath path of the function to be dropped
+ * @param ignoreIfNotExists plag to specify behavior if the function
does not exist:
+ * if set to false, throw an exception
+ * if set to true, nothing happens
+ * @throws FunctionNotExistException if the function does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
throws FunctionNotExistException,
+ CatalogException;
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionAlreadyExistException.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionAlreadyExistException.java
new file mode 100644
index 0000000..c839133
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionAlreadyExistException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Exception for trying to create a function that already exists.
+ */
+public class FunctionAlreadyExistException extends Exception {
+
+ private static final String MSG = "Function %s already exists in
Catalog %s.";
+
+ public FunctionAlreadyExistException(String catalogName, ObjectPath
functionPath) {
+ this(catalogName, functionPath, null);
+ }
+
+ public FunctionAlreadyExistException(String catalogName, ObjectPath
functionPath, Throwable cause) {
+ super(String.format(MSG, functionPath.getFullName(),
catalogName), cause);
+ }
+
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionNotExistException.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionNotExistException.java
new file mode 100644
index 0000000..063f60f
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionNotExistException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Exception for trying to operate on a function that doesn't exist.
+ */
+public class FunctionNotExistException extends Exception {
+
+ private static final String MSG = "Function %s does not exist in
Catalog %s.";
+
+ public FunctionNotExistException(String catalogName, ObjectPath
functionPath) {
+ this(catalogName, functionPath, null);
+ }
+
+ public FunctionNotExistException(String catalogName, ObjectPath
functionPath, Throwable cause) {
+ super(String.format(MSG, functionPath.getFullName(),
catalogName), cause);
+ }
+
+}