fsk119 commented on code in PR #22938:
URL: https://github.com/apache/flink/pull/22938#discussion_r1251814130


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##########
@@ -661,12 +673,24 @@ private void validateAndPrepareFunction(String name, 
CatalogFunction function)
         } else if (function.getFunctionLanguage() == FunctionLanguage.JAVA) {
             // If the jar resource of UDF used is not empty, register it to 
classloader before
             // validate.
-            registerFunctionJarResources(name, 
function.getFunctionResources());
+            List<ResourceUri> resourceUris = function.getFunctionResources();
+            try {
+                if (!resourceUris.isEmpty()) {
+                    resourceManager.registerFunctionResources(
+                            new HashSet<>(function.getFunctionResources()));
+                }
+            } catch (Exception e) {
+                throw new TableException(
+                        String.format(
+                                "Failed to register function jar resource '%s' 
of function '%s'.",
+                                resourceUris, name),
+                        e);
+            }
 
             UserDefinedFunctionHelper.validateClass(
                     (Class<? extends UserDefinedFunction>)
                             resourceManager
-                                    .getUserClassLoader()
+                                    
.createUserClassLoader(function.getFunctionResources())

Review Comment:
   No one is responsible for the created classloader. The resource leaks.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##########
@@ -131,10 +132,17 @@ public boolean dropTemporarySystemFunction(String name, 
boolean ignoreIfNotExist
                             "Could not drop temporary system function. A 
function named '%s' doesn't exist.",
                             name));
         }
+        unregisterFunctionJarResources(function);

Review Comment:
   Does hive or spark remove the resources from the ResourceManager? @lsyldliu 



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -65,6 +69,9 @@ public class ResourceManager implements Closeable {
     private static final String FILE_SCHEME = "file";
 
     private final Path localResourceDir;
+    /** Resource infos for functions. */
+    private final FunctionResourceManager functionResources;

Review Comment:
   Do we need to introduce a FunctionResourceManager. Actually, the 
ResourceManager doesn't distinguish the jar is from Funciton or Connector. I 
think we can just move the unregister method from the `FunctionResourceManager` 
to here.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##########
@@ -131,10 +132,17 @@ public boolean dropTemporarySystemFunction(String name, 
boolean ignoreIfNotExist
                             "Could not drop temporary system function. A 
function named '%s' doesn't exist.",
                             name));
         }
+        unregisterFunctionJarResources(function);
 
         return function != null;
     }
 
+    private void unregisterFunctionJarResources(CatalogFunction function) {

Review Comment:
   The parameter should be nullable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to