KarmaGYZ commented on code in PR #22938:
URL: https://github.com/apache/flink/pull/22938#discussion_r1336771033


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##########
@@ -131,10 +132,17 @@ public boolean dropTemporarySystemFunction(String name, 
boolean ignoreIfNotExist
                             "Could not drop temporary system function. A 
function named '%s' doesn't exist.",
                             name));
         }
+        unregisterFunctionJarResources(function);
 
         return function != null;
     }
 
+    private void unregisterFunctionJarResources(CatalogFunction function) {
+        if (function != null && function.getFunctionLanguage() == 
FunctionLanguage.JAVA) {

Review Comment:
   Should we also apply the same logic to scala and python function? E.g. when 
a python udf registered, we add flink-python jar to the classpath.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -124,15 +136,79 @@ public String registerFileResource(ResourceUri 
resourceUri) throws IOException {
                         Collections.singletonList(resourceUri),
                         ResourceType.FILE,
                         false,
-                        url -> {});
+                        url -> {},
+                        false);
         registerResources(stagingResources, false);
         return resourceInfos.get(new 
ArrayList<>(stagingResources.keySet()).get(0)).getPath();
     }
 
+    /**
+     * Register a resource for function and add it to the function resource 
infos. If the file is
+     * remote, it will be copied to a local file.
+     *
+     * @param resourceUris the resource uri for function.
+     */
+    public void registerFunctionResources(Set<ResourceUri> resourceUris) 
throws IOException {
+        prepareStagingResources(
+                resourceUris,
+                ResourceType.JAR,
+                true,
+                url -> {
+                    try {
+                        JarUtils.checkJarFile(url);
+                    } catch (IOException e) {
+                        throw new ValidationException(
+                                String.format("Failed to register jar resource 
[%s]", url), e);
+                    }
+                },
+                true);
+    }
+
+    /**
+     * Unregister the resource uri in function resources, when the reference 
count of the resource
+     * is 0, the resource will be removed from the function resources.
+     *
+     * @param resourceUris the uris to unregister in function resources.
+     */
+    public void unregisterFunctionResources(List<ResourceUri> resourceUris) {
+        if (!resourceUris.isEmpty()) {
+            resourceUris.forEach(
+                    uri -> {
+                        ResourceCounter counter = 
functionResourceInfos.get(uri);
+                        if (counter != null && counter.decreaseCounter()) {
+                            functionResourceInfos.remove(uri);
+                        }

Review Comment:
   Will we meet any multi-thread issue here?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -124,15 +136,79 @@ public String registerFileResource(ResourceUri 
resourceUri) throws IOException {
                         Collections.singletonList(resourceUri),
                         ResourceType.FILE,
                         false,
-                        url -> {});
+                        url -> {},
+                        false);
         registerResources(stagingResources, false);
         return resourceInfos.get(new 
ArrayList<>(stagingResources.keySet()).get(0)).getPath();
     }
 
+    /**
+     * Register a resource for function and add it to the function resource 
infos. If the file is
+     * remote, it will be copied to a local file.
+     *
+     * @param resourceUris the resource uri for function.
+     */
+    public void registerFunctionResources(Set<ResourceUri> resourceUris) 
throws IOException {
+        prepareStagingResources(
+                resourceUris,
+                ResourceType.JAR,
+                true,
+                url -> {
+                    try {
+                        JarUtils.checkJarFile(url);
+                    } catch (IOException e) {
+                        throw new ValidationException(
+                                String.format("Failed to register jar resource 
[%s]", url), e);
+                    }
+                },
+                true);
+    }
+
+    /**
+     * Unregister the resource uri in function resources, when the reference 
count of the resource
+     * is 0, the resource will be removed from the function resources.
+     *
+     * @param resourceUris the uris to unregister in function resources.
+     */
+    public void unregisterFunctionResources(List<ResourceUri> resourceUris) {
+        if (!resourceUris.isEmpty()) {
+            resourceUris.forEach(
+                    uri -> {
+                        ResourceCounter counter = 
functionResourceInfos.get(uri);
+                        if (counter != null && counter.decreaseCounter()) {
+                            functionResourceInfos.remove(uri);
+                        }
+                    });
+        }
+    }
+
     public URLClassLoader getUserClassLoader() {
         return userClassLoader;
     }
 
+    public URLClassLoader createUserClassLoader(List<ResourceUri> 
resourceUris) {
+        if (resourceUris.isEmpty()) {
+            return userClassLoader;
+        }
+        MutableURLClassLoader classLoader = userClassLoader.copy();
+        for (ResourceUri resourceUri : new HashSet<>(resourceUris)) {
+            
classLoader.addURL(checkNotNull(functionResourceInfos.get(resourceUri)).url);
+        }
+        createdClassLoaderList.add(classLoader);
+
+        return classLoader;
+    }
+
+    public void closeUserClassLoader(URLClassLoader classLoader) {
+        if (createdClassLoaderList.remove(classLoader)) {

Review Comment:
   Not sure will it remove the correct classloader we want to closed if there 
are multiple classloader are exactly same.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##########
@@ -619,11 +633,12 @@ private Optional<ContextResolvedFunction> 
resolveAmbiguousFunctionReference(Stri
 
         String normalizedName = FunctionIdentifier.normalizeName(funcName);
         if (tempSystemFunctions.containsKey(normalizedName)) {
+            CatalogFunction function = tempSystemFunctions.get(normalizedName);
+            registerFunctionJarResources(funcName, 
function.getFunctionResources());

Review Comment:
   ditto.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##########
@@ -44,7 +44,11 @@
 import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;

Review Comment:
   Do we also need to cover the registerCatalogFunction/dropCatalogFunction?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -419,4 +527,26 @@ private void registerResources(
                     LOG.info("Register resource [{}] successfully.", 
resourceUri.getUri());
                 });
     }
+
+    /**
+     * Resource with reference counter, when the counter is 0, it means the 
resource can be removed.
+     */
+    static class ResourceCounter {
+        final URL url;
+        int counter;
+
+        private ResourceCounter(URL url) {
+            this.url = url;
+            this.counter = 0;
+        }
+
+        private void increaseCounter() {
+            this.counter++;
+        }
+
+        private boolean decreaseCounter() {
+            this.counter--;
+            return this.counter == 0;

Review Comment:
   We'd better add a safetynet here in case the counter incorrectly decreased 
to negative.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##########
@@ -661,13 +676,29 @@ private void validateAndPrepareFunction(String name, 
CatalogFunction function)
         } else if (function.getFunctionLanguage() == FunctionLanguage.JAVA) {
             // If the jar resource of UDF used is not empty, register it to 
classloader before
             // validate.
-            registerFunctionJarResources(name, 
function.getFunctionResources());
+            List<ResourceUri> resourceUris = function.getFunctionResources();
+            try {
+                if (!resourceUris.isEmpty()) {
+                    resourceManager.registerFunctionResources(
+                            new HashSet<>(function.getFunctionResources()));

Review Comment:
   Can we also deduplicate the resource list in other resource registration?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java:
##########
@@ -569,6 +581,8 @@ private Optional<ContextResolvedFunction> 
resolvePreciseFunctionReference(Object
         CatalogFunction potentialResult = 
tempCatalogFunctions.get(normalizedIdentifier);
 
         if (potentialResult != null) {
+            registerFunctionJarResources(
+                    oi.asSummaryString(), 
potentialResult.getFunctionResources());

Review Comment:
   Not quite familiar with this component. Could you help me to understand why 
we need to register resource here?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -179,6 +255,15 @@ public void close() throws IOException {
             LOG.debug("Error while closing user classloader.", e);
             exception = e;
         }

Review Comment:
   Also clear the functionResourceInfos.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to