bowenli86 commented on a change in pull request #8212: [FLINK-11519][table] Add
function related catalog APIs
URL: https://github.com/apache/flink/pull/8212#discussion_r276850319
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
##########
@@ -269,4 +273,81 @@ public boolean tableExists(ObjectPath tablePath) {
return tablePath != null &&
databaseExists(tablePath.getDatabaseName()) && tables.containsKey(tablePath);
}
+ // ------ functions ------
+
+ @Override
+ public void createFunction(ObjectPath functionPath, CatalogFunction
function, boolean ignoreIfExists)
+ throws FunctionAlreadyExistException, DatabaseNotExistException
{
+ checkArgument(functionPath != null);
+ checkArgument(function != null);
+
+ if (!databaseExists(functionPath.getDatabaseName())) {
+ throw new DatabaseNotExistException(catalogName,
functionPath.getDatabaseName());
+ }
+
+ if (functionExists(functionPath)) {
+ if (!ignoreIfExists) {
+ throw new
FunctionAlreadyExistException(catalogName, functionPath);
+ }
+ } else {
+ functions.put(functionPath, function.copy());
+ }
+ }
+
+ @Override
+ public void alterFunction(ObjectPath functionPath, CatalogFunction
newFunction, boolean ignoreIfNotExists)
+ throws FunctionNotExistException {
+ checkArgument(functionPath != null);
+ checkArgument(newFunction != null);
+
+ if (functionExists(functionPath)) {
+ functions.put(functionPath, newFunction.copy());
+ } else if (!ignoreIfNotExists) {
+ throw new FunctionNotExistException(catalogName,
functionPath);
+ }
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean
ignoreIfNotExists) throws FunctionNotExistException {
+ checkArgument(functionPath != null);
+
+ if (functionExists(functionPath)) {
+ functions.remove(functionPath);
+ } else if (!ignoreIfNotExists) {
+ throw new FunctionNotExistException(catalogName,
functionPath);
+ }
+ }
+
+ @Override
+ public List<String> listFunctions(String databaseName) throws
DatabaseNotExistException {
+
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName), "databaseName
cannot be null or empty");
+
+ String db = databaseName.toLowerCase();
Review comment:
probably remove this for now?
I think making Flink case-insensitive to meta-objects may be better on its
own effort.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services