dawidwys commented on a change in pull request #11785:
URL: https://github.com/apache/flink/pull/11785#discussion_r411497262



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
##########
@@ -622,4 +612,90 @@ private void registerTempCatalogFunction(ObjectIdentifier 
oi, FunctionDefinition
                        Optional.of(new 
FunctionLookup.Result(FunctionIdentifier.of(funcName), fd)
                )).orElseGet(() -> resolvePreciseFunctionReference(oi));
        }
+
+       private void validateAndPrepareFunction(CatalogFunction function) 
throws ClassNotFoundException {
+               // If the input is instance of UserDefinedFunction, it means it 
uses the new type inference.
+               // In this situation the UDF have not been validated and 
cleaned, so we need to validate it
+               // and clean its closure here.
+               // If the input is instance of `ScalarFunctionDefinition`, 
`TableFunctionDefinition` and so on,
+               // it means it uses the old type inference. We assume that they 
have been validated before being
+               // wrapped.
+               if (function instanceof InlineCatalogFunction &&
+                       ((InlineCatalogFunction) function).getDefinition() 
instanceof UserDefinedFunction) {
+
+                       FunctionDefinition definition = 
((InlineCatalogFunction) function).getDefinition();
+                       UserDefinedFunctionHelper.prepareInstance(config, 
(UserDefinedFunction) definition);
+               } else if (function.getFunctionLanguage() == 
FunctionLanguage.JAVA) {
+                       ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
+                       UserDefinedFunctionHelper.validateClass(
+                               (Class<? extends UserDefinedFunction>) 
contextClassLoader
+                                       .loadClass(function.getClassName()));
+               }
+       }
+
+       private FunctionDefinition getFunctionDefinition(String name, 
CatalogFunction function) {
+               if (function instanceof InlineCatalogFunction) {
+                       // The instantiated UDFs have been validated and 
cleaned when registering, just return them
+                       // directly.
+                       return ((InlineCatalogFunction) 
function).getDefinition();
+               }
+               // Currently the uninstantiated functions are all from sql and 
catalog that use the old type inference,
+               // so using FunctionDefinitionUtil to instantiate them and wrap 
them with `ScalarFunctionDefinition`,
+               // `TableFunctionDefinition`, etc. If the new type inference is 
fully functional, this should be
+               // changed to use 
`UserDefinedFunctionHelper#instantiateFunction`.
+               return FunctionDefinitionUtil.createFunctionDefinition(name, 
function.getClassName());
+       }
+
+       /**
+        * The CatalogFunction which holds a instantiated UDF.
+        */
+       private static class InlineCatalogFunction implements CatalogFunction {
+
+               private final FunctionDefinition definition;
+
+               InlineCatalogFunction(FunctionDefinition definition) {
+                       this.definition = definition;
+               }
+
+               @Override
+               public String getClassName() {
+                       // Not all instantiated UDFs have a class name, such as 
Python Lambda UDF. Even if the UDF
+                       // has a class name, there is no guarantee that the new 
UDF object constructed from the
+                       // class name is the same as the UDF held by this 
object. To reduce the chance of making
+                       // mistakes, UnsupportedOperationException is thrown 
here.
+                       throw new UnsupportedOperationException(
+                               "This CatalogFunction is a 
InlineCatalogFunction. This method should not be called.");
+               }
+
+               @Override
+               public CatalogFunction copy() {
+                       return new InlineCatalogFunction(definition);
+               }
+
+               @Override
+               public Optional<String> getDescription() {
+                       return Optional.empty();
+               }
+
+               @Override
+               public Optional<String> getDetailedDescription() {
+                       return Optional.empty();
+               }
+
+               @Override
+               public boolean isGeneric() {
+                       throw new UnsupportedOperationException(
+                               "This CatalogFunction is a 
InlineCatalogFunction. This method should not be called.");
+               }
+
+               @Override
+               public FunctionLanguage getFunctionLanguage() {
+                       throw new UnsupportedOperationException(

Review comment:
       Can't we support this method? I think it should be rather 
straightforward to pass the language from which the function originated. For 
now it will always be `JAVA` if I am correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to