zjuwangg commented on a change in pull request #8920: [FLINK-13024][table] 
integrate FunctionCatalog with CatalogManager
URL: https://github.com/apache/flink/pull/8920#discussion_r298905148
 
 

 ##########
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 ##########
 @@ -136,19 +172,66 @@ public void registerScalarFunction(String name, 
ScalarFunction function) {
 
                        foundDefinition = 
BuiltInFunctionDefinitions.getDefinitions()
                                .stream()
-                               .filter(f -> 
normalizeName(name).equals(normalizeName(f.getName())))
+                               .filter(f -> 
functionName.equals(normalizeName(f.getName())))
                                .findFirst()
                                .map(Function.identity());
                }
 
                return foundDefinition.map(definition -> new 
FunctionLookup.Result(
-                       ObjectIdentifier.of(defaultCatalogName, 
defaultDatabaseName, name),
+                       ObjectIdentifier.of(catalogManager.getCurrentCatalog(), 
catalogManager.getCurrentDatabase(), name),
                        definition)
                );
        }
 
+       private static Object initiate(ClassLoader classLoader, String 
functionIdentifier) {
+               try {
+                       return 
classLoader.loadClass(functionIdentifier).newInstance();
+               } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
+                       throw new TableException(
+                               String.format("Failed to initiate an instance 
of class %s.", functionIdentifier), e);
+               }
+       }
+
        private void registerFunction(String name, FunctionDefinition 
functionDefinition) {
-               userFunctions.put(normalizeName(name), functionDefinition);
+
+               if (functionDefinition instanceof BuiltInFunctionDefinition) {
+                       throw new TableException("Cannot register a built-in 
Flink function");
+               } else {
+                       Catalog catalog = 
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
+
+                       String functionIdentifier;
+
+                       if (functionDefinition.getKind() == 
FunctionKind.SCALAR) {
 
 Review comment:
   minor:
   Can we use the switch/case syntax to make it more readable?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to