This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new af76026  [FLINK-11519][table] Add function related catalog APIs
af76026 is described below

commit af76026dd54bb331b5df881edc29298e40d2da08
Author: xuefuz <[email protected]>
AuthorDate: Fri Apr 26 07:13:31 2019 -0700

    [FLINK-11519][table] Add function related catalog APIs
    
    This closes #8275
---
 .../table/catalog/GenericCatalogFunction.java      |  75 +++++++++
 .../table/catalog/GenericInMemoryCatalog.java      | 106 ++++++++++--
 .../flink/table/catalog/CatalogTestUtil.java       |   5 +
 .../table/catalog/GenericInMemoryCatalogTest.java  | 183 +++++++++++++++++++++
 .../flink/table/catalog/CatalogFunction.java       |  64 +++++++
 .../flink/table/catalog/ReadableCatalog.java       |  33 ++++
 .../table/catalog/ReadableWritableCatalog.java     |  45 +++++
 .../exceptions/FunctionAlreadyExistException.java  |  38 +++++
 .../exceptions/FunctionNotExistException.java      |  38 +++++
 9 files changed, 575 insertions(+), 12 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
new file mode 100644
index 0000000..5c0176d
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogFunction.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A generic catalog function implementation.
+ */
+public class GenericCatalogFunction implements CatalogFunction {
+
+       private final String className; // Fully qualified class name of the 
function
+       private final Map<String, String> properties;
+
+       public GenericCatalogFunction(String className) {
+               this(className, new HashMap<>());
+       }
+
+       public GenericCatalogFunction(String className, Map<String, String> 
properties) {
+               this.className = className;
+               this.properties = properties;
+       }
+
+       @Override
+       public String getClassName() {
+               return this.className;
+       }
+
+       @Override
+       public Map<String, String> getProperties() {
+               return this.properties;
+       }
+
+       @Override
+       public GenericCatalogFunction copy() {
+               return new GenericCatalogFunction(className, new 
HashMap<>(properties));
+       }
+
+       @Override
+       public Optional<String> getDescription() {
+               return Optional.of("This is a user-defined function");
+       }
+
+       @Override
+       public Optional<String> getDetailedDescription() {
+               return Optional.of("This is a user-defined function in an 
in-memory catalog implementation");
+       }
+
+       @Override
+       public String toString() {
+               return "GenericCatalogFunction{" +
+                       ", className='" + className + '\'' +
+                       ", properties=" + properties +
+                       '}';
+       }
+
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index 6a4e554..b9568a5 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -22,6 +22,8 @@ import 
org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
@@ -38,6 +40,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A generic catalog implementation that holds all meta objects in memory.
@@ -51,6 +54,7 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
        private final String catalogName;
        private final Map<String, CatalogDatabase> databases;
        private final Map<ObjectPath, CatalogBaseTable> tables;
+       private final Map<ObjectPath, CatalogFunction> functions;
        private final Map<ObjectPath, Map<CatalogPartitionSpec, 
CatalogPartition>> partitions;
 
        public GenericInMemoryCatalog(String name) {
@@ -60,6 +64,7 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
                this.databases = new LinkedHashMap<>();
                this.databases.put(DEFAULT_DB, new GenericCatalogDatabase(new 
HashMap<>()));
                this.tables = new LinkedHashMap<>();
+               this.functions = new LinkedHashMap<>();
                this.partitions = new LinkedHashMap<>();
        }
 
@@ -95,7 +100,7 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
        public void createDatabase(String databaseName, CatalogDatabase db, 
boolean ignoreIfExists)
                throws DatabaseAlreadyExistException {
                
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
-               checkArgument(db != null);
+               checkNotNull(db);
 
                if (databaseExists(databaseName)) {
                        if (!ignoreIfExists) {
@@ -127,15 +132,15 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
        private boolean isDatabaseEmpty(String databaseName) {
                
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
 
-               return tables.keySet().stream().noneMatch(op -> 
op.getDatabaseName().equals(databaseName));
-               // TODO: also check function when function is added.
+               return tables.keySet().stream().noneMatch(op -> 
op.getDatabaseName().equals(databaseName)) &&
+                       functions.keySet().stream().noneMatch(op -> 
op.getDatabaseName().equals(databaseName));
        }
 
        @Override
        public void alterDatabase(String databaseName, CatalogDatabase 
newDatabase, boolean ignoreIfNotExists)
                throws DatabaseNotExistException {
                
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
-               checkArgument(newDatabase != null);
+               checkNotNull(newDatabase);
 
                if (databaseExists(databaseName)) {
                        databases.put(databaseName, newDatabase.copy());
@@ -172,8 +177,8 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
        @Override
        public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
                throws TableAlreadyExistException, DatabaseNotExistException {
-               checkArgument(tablePath != null);
-               checkArgument(table != null);
+               checkNotNull(tablePath);
+               checkNotNull(table);
 
                if (!databaseExists(tablePath.getDatabaseName())) {
                        throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
@@ -195,8 +200,8 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
        @Override
        public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
                throws TableNotExistException {
-               checkArgument(tablePath != null);
-               checkArgument(newTable != null);
+               checkNotNull(tablePath);
+               checkNotNull(newTable);
 
                // TODO: validate the new and old CatalogBaseTable must be of 
the same type. For example, this doesn't
                //              allow alter a regular table to partitioned 
table, or alter a view to a table, and vice versa.
@@ -213,7 +218,7 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
 
        @Override
        public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) 
throws TableNotExistException {
-               checkArgument(tablePath != null);
+               checkNotNull(tablePath);
 
                if (tableExists(tablePath)) {
                        tables.remove(tablePath);
@@ -227,7 +232,7 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
        @Override
        public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
                throws TableNotExistException, TableAlreadyExistException {
-               checkArgument(tablePath != null);
+               checkNotNull(tablePath);
                
checkArgument(!StringUtils.isNullOrWhitespaceOnly(newTableName));
 
                if (tableExists(tablePath)) {
@@ -276,7 +281,7 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
 
        @Override
        public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException {
-               checkArgument(tablePath != null);
+               checkNotNull(tablePath);
 
                if (!tableExists(tablePath)) {
                        throw new TableNotExistException(catalogName, 
tablePath);
@@ -287,7 +292,84 @@ public class GenericInMemoryCatalog implements 
ReadableWritableCatalog {
 
        @Override
        public boolean tableExists(ObjectPath tablePath) {
-               return tablePath != null && 
databaseExists(tablePath.getDatabaseName()) && tables.containsKey(tablePath);
+               checkNotNull(tablePath);
+
+               return databaseExists(tablePath.getDatabaseName()) && 
tables.containsKey(tablePath);
+       }
+
+       // ------ functions ------
+
+       @Override
+       public void createFunction(ObjectPath functionPath, CatalogFunction 
function, boolean ignoreIfExists)
+                       throws FunctionAlreadyExistException, 
DatabaseNotExistException {
+               checkNotNull(functionPath);
+               checkNotNull(function);
+
+               if (!databaseExists(functionPath.getDatabaseName())) {
+                       throw new DatabaseNotExistException(catalogName, 
functionPath.getDatabaseName());
+               }
+
+               if (functionExists(functionPath)) {
+                       if (!ignoreIfExists) {
+                               throw new 
FunctionAlreadyExistException(catalogName, functionPath);
+                       }
+               } else {
+                       functions.put(functionPath, function.copy());
+               }
+       }
+
+       @Override
+       public void alterFunction(ObjectPath functionPath, CatalogFunction 
newFunction, boolean ignoreIfNotExists)
+                       throws FunctionNotExistException {
+               checkNotNull(functionPath);
+               checkNotNull(newFunction);
+
+               if (functionExists(functionPath)) {
+                       functions.put(functionPath, newFunction.copy());
+               } else if (!ignoreIfNotExists) {
+                       throw new FunctionNotExistException(catalogName, 
functionPath);
+               }
+       }
+
+       @Override
+       public void dropFunction(ObjectPath functionPath, boolean 
ignoreIfNotExists) throws FunctionNotExistException {
+               checkNotNull(functionPath);
+
+               if (functionExists(functionPath)) {
+                       functions.remove(functionPath);
+               } else if (!ignoreIfNotExists) {
+                       throw new FunctionNotExistException(catalogName, 
functionPath);
+               }
+       }
+
+       @Override
+       public List<String> listFunctions(String databaseName) throws 
DatabaseNotExistException {
+               
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName 
cannot be null or empty");
+
+               if (!databaseExists(databaseName)) {
+                       throw new DatabaseNotExistException(catalogName, 
databaseName);
+               }
+
+               return functions.keySet().stream()
+                       .filter(k -> 
k.getDatabaseName().equals(databaseName)).map(k -> k.getObjectName())
+                       .collect(Collectors.toList());
+       }
+
+       @Override
+       public CatalogFunction getFunction(ObjectPath functionPath) throws 
FunctionNotExistException {
+               checkNotNull(functionPath);
+
+               if (!functionExists(functionPath)) {
+                       throw new FunctionNotExistException(catalogName, 
functionPath);
+               } else {
+                       return functions.get(functionPath).copy();
+               }
+       }
+
+       @Override
+       public boolean functionExists(ObjectPath functionPath) {
+               checkNotNull(functionPath);
+               return databaseExists(functionPath.getDatabaseName()) && 
functions.containsKey(functionPath);
        }
 
        // ------ partitions ------
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
index 9974272..33d1ea1 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTestUtil.java
@@ -87,6 +87,11 @@ public class CatalogTestUtil {
                assertEquals(d1.getProperties(), d2.getProperties());
        }
 
+       protected static void checkEquals(CatalogFunction f1, CatalogFunction 
f2) {
+               assertEquals(f1.getClassName(), f2.getClassName());
+               assertEquals(f1.getProperties(), f2.getProperties());
+       }
+
        protected static void checkEquals(CatalogPartition p1, CatalogPartition 
p2) {
                assertEquals(p1.getProperties(), p2.getProperties());
        }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
index ad9fd92..d96d8b5 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java
@@ -24,12 +24,15 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.functions.ScalarFunction;
 
 import org.junit.After;
 import org.junit.Before;
@@ -94,6 +97,9 @@ public class GenericInMemoryCatalogTest {
                if (catalog.tableExists(path3)) {
                        catalog.dropTable(path3, true);
                }
+               if (catalog.functionExists(path1)) {
+                       catalog.dropFunction(path1, true);
+               }
                if (catalog.databaseExists(db1)) {
                        catalog.dropDatabase(db1, true);
                }
@@ -946,6 +952,158 @@ public class GenericInMemoryCatalogTest {
                
assertFalse(catalog.partitionExists(ObjectPath.fromString("non.exist"), 
createPartitionSpec()));
        }
 
+       // ------ functions ------
+
+       @Test
+       public void testCreateFunction() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               assertFalse(catalog.functionExists(path1));
+
+               catalog.createFunction(path1, createFunction(), false);
+
+               assertTrue(catalog.functionExists(path1));
+
+               catalog.dropFunction(path1, false);
+               catalog.dropDatabase(db1, false);
+       }
+
+       @Test
+       public void testCreateFunction_DatabaseNotExistException() throws 
Exception {
+               assertFalse(catalog.databaseExists(db1));
+
+               exception.expect(DatabaseNotExistException.class);
+               exception.expectMessage("Database db1 does not exist in 
Catalog");
+               catalog.createFunction(path1, createFunction(), false);
+       }
+
+       @Test
+       public void testCreateFunction_FunctionAlreadyExistException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createFunction(path1, createFunction(), false);
+
+               exception.expect(FunctionAlreadyExistException.class);
+               exception.expectMessage("Function db1.t1 already exists in 
Catalog");
+               catalog.createFunction(path1, createFunction(), false);
+       }
+
+       @Test
+       public void testCreateFunction_FunctionAlreadyExist_ignored() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               CatalogFunction func = createFunction();
+               catalog.createFunction(path1, func, false);
+
+               CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+               catalog.createFunction(path1, createAnotherFunction(), true);
+
+               CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+               catalog.dropFunction(path1, false);
+               catalog.dropDatabase(db1, false);
+       }
+
+       @Test
+       public void testAlterFunction() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               CatalogFunction func = createFunction();
+               catalog.createFunction(path1, func, false);
+
+               CatalogTestUtil.checkEquals(func, catalog.getFunction(path1));
+
+               CatalogFunction newFunc = createAnotherFunction();
+               catalog.alterFunction(path1, newFunc, false);
+
+               assertNotEquals(func, catalog.getFunction(path1));
+               CatalogTestUtil.checkEquals(newFunc, 
catalog.getFunction(path1));
+
+               catalog.dropFunction(path1, false);
+               catalog.dropDatabase(db1, false);
+       }
+
+       @Test
+       public void testAlterFunction_FunctionNotExistException() throws 
Exception {
+               exception.expect(FunctionNotExistException.class);
+               exception.expectMessage("Function db1.nonexist does not exist 
in Catalog");
+               catalog.alterFunction(nonExistObjectPath, createFunction(), 
false);
+       }
+
+       @Test
+       public void testAlterFunction_FunctionNotExist_ignored() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.alterFunction(nonExistObjectPath, createFunction(), 
true);
+
+               assertFalse(catalog.functionExists(nonExistObjectPath));
+
+               catalog.dropDatabase(db1, false);
+       }
+
+       @Test
+       public void testListFunctions() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               CatalogFunction func = createFunction();
+               catalog.createFunction(path1, func, false);
+
+               assertEquals(path1.getObjectName(), 
catalog.listFunctions(db1).get(0));
+
+               catalog.dropFunction(path1, false);
+               catalog.dropDatabase(db1, false);
+       }
+
+       @Test
+       public void testListFunctions_DatabaseNotExistException() throws 
Exception{
+               exception.expect(DatabaseNotExistException.class);
+               exception.expectMessage("Database db1 does not exist in 
Catalog");
+               catalog.listFunctions(db1);
+       }
+
+       @Test
+       public void testGetFunction_FunctionNotExistException() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+
+               exception.expect(FunctionNotExistException.class);
+               exception.expectMessage("Function db1.nonexist does not exist 
in Catalog");
+               catalog.getFunction(nonExistObjectPath);
+       }
+
+       @Test
+       public void testGetFunction_FunctionNotExistException_NoDb() throws 
Exception {
+               exception.expect(FunctionNotExistException.class);
+               exception.expectMessage("Function db1.nonexist does not exist 
in Catalog");
+               catalog.getFunction(nonExistObjectPath);
+       }
+
+       @Test
+       public void testDropFunction() throws Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.createFunction(path1, createFunction(), false);
+
+               assertTrue(catalog.functionExists(path1));
+
+               catalog.dropFunction(path1, false);
+
+               assertFalse(catalog.functionExists(path1));
+
+               catalog.dropDatabase(db1, false);
+       }
+
+       @Test
+       public void testDropFunction_FunctionNotExistException() throws 
Exception {
+               exception.expect(FunctionNotExistException.class);
+               exception.expectMessage("Function non.exist does not exist in 
Catalog");
+               catalog.dropFunction(nonExistDbPath, false);
+       }
+
+       @Test
+       public void testDropFunction_FunctionNotExist_ignored() throws 
Exception {
+               catalog.createDatabase(db1, createDb(), false);
+               catalog.dropFunction(nonExistObjectPath, true);
+               catalog.dropDatabase(db1, false);
+       }
+
        // ------ utilities ------
 
        private GenericCatalogTable createStreamingTable() {
@@ -1092,4 +1250,29 @@ public class GenericInMemoryCatalogTest {
                        "This is another view");
        }
 
+       protected CatalogFunction createFunction() {
+               return new 
GenericCatalogFunction(MyScalarFunction.class.getName());
+       }
+
+       protected CatalogFunction createAnotherFunction() {
+               return new 
GenericCatalogFunction(MyOtherScalarFunction.class.getName());
+       }
+
+       /**
+        * Test UDF.
+        */
+       public static class MyScalarFunction extends ScalarFunction {
+               public Integer eval(Integer i) {
+                       return i + 1;
+               }
+       }
+
+       /**
+        * Test UDF.
+        */
+       public static class MyOtherScalarFunction extends ScalarFunction {
+               public String eval(Integer i) {
+                       return String.valueOf(i);
+               }
+       }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
new file mode 100644
index 0000000..9dbb791
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogFunction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface for a function in a catalog.
+ */
+public interface CatalogFunction {
+
+       /**
+        * Get the full name of the class backing the function.
+        *
+        * @return the full name of the class
+        */
+       String getClassName();
+
+       /**
+        * Get the properties of the function.
+        *
+        * @return the properties of the function
+        */
+       Map<String, String> getProperties();
+
+       /**
+        * Create a deep copy of the function.
+        *
+        * @return a deep copy of "this" instance
+        */
+       CatalogFunction copy();
+
+       /**
+        * Get a brief description of the function.
+        *
+        * @return an optional short description of the function
+        */
+       Optional<String> getDescription();
+
+       /**
+        * Get a detailed description of the function.
+        *
+        * @return an optional long description of the function
+        */
+       Optional<String> getDetailedDescription();
+
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
index f0b675d..1b08736 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableCatalog.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
@@ -191,4 +192,36 @@ public interface ReadableCatalog {
         */
        boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec) throws CatalogException;
 
+       // ------ functions ------
+
+       /**
+        * List the names of all functions in the given database. An empty list 
is returned if none is registered.
+        *
+        * @param dbName name of the database.
+        * @return a list of the names of the functions in this database
+        * @throws DatabaseNotExistException if the database does not exist
+        * @throws CatalogException in case of any runtime exception
+        */
+       List<String> listFunctions(String dbName) throws 
DatabaseNotExistException, CatalogException;
+
+       /**
+        * Get the function.
+        *
+        * @param functionPath path of the function
+        * @return the requested function
+        * @throws FunctionNotExistException if the function does not exist in 
the catalog
+        * @throws CatalogException in case of any runtime exception
+        */
+       CatalogFunction getFunction(ObjectPath functionPath) throws 
FunctionNotExistException, CatalogException;
+
+       /**
+        * Check whether a function exists or not.
+        *
+        * @param functionPath path of the function
+        * @return true if the function exists in the catalog
+        *         false otherwise
+        * @throws CatalogException in case of any runtime exception
+        */
+       boolean functionExists(ObjectPath functionPath) throws CatalogException;
+
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
index 755ae27..035b049 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
@@ -22,6 +22,8 @@ import 
org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
@@ -195,4 +197,47 @@ public interface ReadableWritableCatalog extends 
ReadableCatalog {
        void alterPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
                throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionNotExistException, CatalogException;
 
+       // ------ functions ------
+
+       /**
+        * Create a function.
+        *
+        * @param functionPath      path of the function
+        * @param function          the function to be created
+        * @param ignoreIfExists    flag to specify behavior if a function with 
the given name already exists:
+        *                          if set to false, it throws a 
FunctionAlreadyExistException,
+        *                          if set to true, nothing happens.
+        * @throws FunctionAlreadyExistException if the function already exist
+        * @throws DatabaseNotExistException     if the given database does not 
exist
+        * @throws CatalogException in case of any runtime exception
+        */
+       void createFunction(ObjectPath functionPath, CatalogFunction function, 
boolean ignoreIfExists)
+               throws FunctionAlreadyExistException, 
DatabaseNotExistException, CatalogException;
+
+       /**
+        * Modify an existing function.
+        *
+        * @param functionPath       path of the function
+        * @param newFunction        the function to be modified
+        * @param ignoreIfNotExists  flag to specify behavior if the function 
does not exist:
+        *                           if set to false, throw an exception
+        *                           if set to true, nothing happens
+        * @throws FunctionNotExistException if the function does not exist
+        * @throws CatalogException in case of any runtime exception
+        */
+       void alterFunction(ObjectPath functionPath, CatalogFunction 
newFunction, boolean ignoreIfNotExists)
+               throws FunctionNotExistException, CatalogException;
+
+       /**
+        * Drop a function.
+        *
+        * @param functionPath       path of the function to be dropped
+        * @param ignoreIfNotExists  plag to specify behavior if the function 
does not exist:
+        *                           if set to false, throw an exception
+        *                           if set to true, nothing happens
+        * @throws FunctionNotExistException if the function does not exist
+        * @throws CatalogException in case of any runtime exception
+        */
+       void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) 
throws FunctionNotExistException,
+               CatalogException;
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionAlreadyExistException.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionAlreadyExistException.java
new file mode 100644
index 0000000..c839133
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionAlreadyExistException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Exception for trying to create a function that already exists.
+ */
+public class FunctionAlreadyExistException extends Exception {
+
+       private static final String MSG = "Function %s already exists in 
Catalog %s.";
+
+       public FunctionAlreadyExistException(String catalogName, ObjectPath 
functionPath) {
+               this(catalogName, functionPath, null);
+       }
+
+       public FunctionAlreadyExistException(String catalogName, ObjectPath 
functionPath, Throwable cause) {
+               super(String.format(MSG, functionPath.getFullName(), 
catalogName), cause);
+       }
+
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionNotExistException.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionNotExistException.java
new file mode 100644
index 0000000..063f60f
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/FunctionNotExistException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.exceptions;
+
+import org.apache.flink.table.catalog.ObjectPath;
+
+/**
+ * Exception for trying to operate on a function that doesn't exist.
+ */
+public class FunctionNotExistException extends Exception {
+
+       private static final String MSG = "Function %s does not exist in 
Catalog %s.";
+
+       public FunctionNotExistException(String catalogName, ObjectPath 
functionPath) {
+               this(catalogName, functionPath, null);
+       }
+
+       public FunctionNotExistException(String catalogName, ObjectPath 
functionPath, Throwable cause) {
+               super(String.format(MSG, functionPath.getFullName(), 
catalogName), cause);
+       }
+
+}

Reply via email to