dawidwys commented on a change in pull request #11785:
URL: https://github.com/apache/flink/pull/11785#discussion_r412145601



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
##########
@@ -622,4 +612,89 @@ private void registerTempCatalogFunction(ObjectIdentifier 
oi, FunctionDefinition
                        Optional.of(new 
FunctionLookup.Result(FunctionIdentifier.of(funcName), fd)
                )).orElseGet(() -> resolvePreciseFunctionReference(oi));
        }
+
+       private void validateAndPrepareFunction(CatalogFunction function) 
throws ClassNotFoundException {
+               // If the input is instance of UserDefinedFunction, it means it 
uses the new type inference.
+               // In this situation the UDF have not been validated and 
cleaned, so we need to validate it
+               // and clean its closure here.
+               // If the input is instance of `ScalarFunctionDefinition`, 
`TableFunctionDefinition` and so on,
+               // it means it uses the old type inference. We assume that they 
have been validated before being
+               // wrapped.
+               if (function instanceof InlineCatalogFunction &&
+                       ((InlineCatalogFunction) function).getDefinition() 
instanceof UserDefinedFunction) {
+
+                       FunctionDefinition definition = 
((InlineCatalogFunction) function).getDefinition();
+                       UserDefinedFunctionHelper.prepareInstance(config, 
(UserDefinedFunction) definition);
+               } else if (function.getFunctionLanguage() == 
FunctionLanguage.JAVA) {
+                       ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
+                       UserDefinedFunctionHelper.validateClass(
+                               (Class<? extends UserDefinedFunction>) 
contextClassLoader
+                                       .loadClass(function.getClassName()));
+               }
+       }
+
+       private FunctionDefinition getFunctionDefinition(String name, 
CatalogFunction function) {
+               if (function instanceof InlineCatalogFunction) {
+                       // The instantiated UDFs have been validated and 
cleaned when registering, just return them
+                       // directly.
+                       return ((InlineCatalogFunction) 
function).getDefinition();
+               }
+               // Currently the uninstantiated functions are all from sql and 
catalog that use the old type inference,
+               // so using FunctionDefinitionUtil to instantiate them and wrap 
them with `ScalarFunctionDefinition`,
+               // `TableFunctionDefinition`, etc. If the new type inference is 
fully functional, this should be
+               // changed to use 
`UserDefinedFunctionHelper#instantiateFunction`.
+               return FunctionDefinitionUtil.createFunctionDefinition(name, 
function.getClassName());

Review comment:
       Just a note: Actually we should use the new type stack for functions 
created from DDL as soon as possible. We could already support it for scalar 
and table functions. It does not work yet for aggregate functions. I am fine 
with the current approach as it is the same as before this PR
   
   

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
##########
@@ -622,4 +612,89 @@ private void registerTempCatalogFunction(ObjectIdentifier 
oi, FunctionDefinition
                        Optional.of(new 
FunctionLookup.Result(FunctionIdentifier.of(funcName), fd)
                )).orElseGet(() -> resolvePreciseFunctionReference(oi));
        }
+
+       private void validateAndPrepareFunction(CatalogFunction function) 
throws ClassNotFoundException {

Review comment:
       Add `@SuppressWarnings("unchecked")`.
   
   Will add that when merging.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to