fsk119 commented on code in PR #20001:
URL: https://github.com/apache/flink/pull/20001#discussion_r920780739


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -69,43 +74,58 @@ public URLClassLoader getUserClassLoader() {
         return userClassLoader;
     }
 
-    public void registerResource(ResourceUri resourceUri) throws IOException {
-        // check whether the resource has been registered
-        if (resourceInfos.containsKey(resourceUri)) {
-            LOG.info(
-                    "Resource [{}] has been registered, overwriting of 
registered resource is not supported "
-                            + "in the current version, skipping.",
-                    resourceUri.getUri());
-            return;
-        }
+    /**
+     * Due to anyone of the resource in list maybe fail during register, so we 
should stage it
+     * before actual register to guarantee transaction process. If all the 
resources are avaliable,

Review Comment:
   typo: avaliable -> available



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -119,19 +139,44 @@ public Set<URL> getJarResourceURLs() {
                 .collect(Collectors.toSet());
     }
 
-    private void checkResource(Path path) throws IOException {
-        FileSystem fs = path.getFileSystem();
-        // check resource exists firstly
-        if (!fs.exists(path)) {
-            throw new FileNotFoundException(String.format("Resource [%s] not 
found.", path));
-        }
-
-        // register directory is not allowed for resource
-        if (fs.getFileStatus(path).isDir()) {
+    private void checkJarResource(List<ResourceUri> resourceUris) throws 
IOException {
+        // only support register jar resource
+        if (resourceUris.stream()
+                .anyMatch(resourceUri -> 
!ResourceType.JAR.equals(resourceUri.getResourceType()))) {
             throw new IOException(
                     String.format(
-                            "The resource [%s] is a directory, however, the 
directory is not allowed for registering resource.",
-                            path));
+                            "Only support to register jar resource, resource 
info:\n %s.",
+                            resourceUris.stream()
+                                    .map(ResourceUri::getUri)
+                                    .collect(Collectors.joining(",\n"))));
+        }
+
+        for (ResourceUri resourceUri : resourceUris) {
+            // here can check whether the resource path is valid
+            Path path = new Path(resourceUri.getUri());
+            // file name should end with .jar suffix
+            String fileExtension = Files.getFileExtension(path.getName());
+            if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
+                throw new IOException(
+                        String.format(
+                                "The registering jar resource [%s] must ends 
with '.jar' suffix.",
+                                path));
+            }
+
+            FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
+            // check resource exists firstly
+            if (!fs.exists(path)) {
+                throw new FileNotFoundException(
+                        String.format("Jar resource [%s] not found.", path));
+            }
+
+            // register directory is not allowed for resource
+            if (fs.getFileStatus(path).isDir()) {
+                throw new IOException(
+                        String.format(
+                                "The registering jar resource [%s] is a 
directory, however directory is not allowed to register.",
+                                path));

Review Comment:
   "The registering jar resource [%s] is a directory that is not allowed."



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -119,19 +139,44 @@ public Set<URL> getJarResourceURLs() {
                 .collect(Collectors.toSet());
     }
 
-    private void checkResource(Path path) throws IOException {
-        FileSystem fs = path.getFileSystem();
-        // check resource exists firstly
-        if (!fs.exists(path)) {
-            throw new FileNotFoundException(String.format("Resource [%s] not 
found.", path));
-        }
-
-        // register directory is not allowed for resource
-        if (fs.getFileStatus(path).isDir()) {
+    private void checkJarResource(List<ResourceUri> resourceUris) throws 
IOException {
+        // only support register jar resource
+        if (resourceUris.stream()
+                .anyMatch(resourceUri -> 
!ResourceType.JAR.equals(resourceUri.getResourceType()))) {
             throw new IOException(
                     String.format(
-                            "The resource [%s] is a directory, however, the 
directory is not allowed for registering resource.",
-                            path));
+                            "Only support to register jar resource, resource 
info:\n %s.",
+                            resourceUris.stream()
+                                    .map(ResourceUri::getUri)
+                                    .collect(Collectors.joining(",\n"))));
+        }
+
+        for (ResourceUri resourceUri : resourceUris) {
+            // here can check whether the resource path is valid
+            Path path = new Path(resourceUri.getUri());
+            // file name should end with .jar suffix
+            String fileExtension = Files.getFileExtension(path.getName());
+            if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
+                throw new IOException(
+                        String.format(
+                                "The registering jar resource [%s] must ends 
with '.jar' suffix.",
+                                path));
+            }
+
+            FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
+            // check resource exists firstly
+            if (!fs.exists(path)) {
+                throw new FileNotFoundException(
+                        String.format("Jar resource [%s] not found.", path));
+            }
+
+            // register directory is not allowed for resource
+            if (fs.getFileStatus(path).isDir()) {
+                throw new IOException(

Review Comment:
   ditto



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java:
##########
@@ -83,9 +82,49 @@ public void testRegisterResource() throws Exception {
                 ClassNotFoundException.class,
                 () -> Class.forName(LOWER_UDF_CLASS, false, userClassLoader));
 
+        ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, 
udfJar.getPath());
         // register the same jar repeatedly
-        resourceManager.registerResource(new ResourceUri(ResourceType.JAR, 
udfJar.getPath()));
-        resourceManager.registerResource(new ResourceUri(ResourceType.JAR, 
udfJar.getPath()));
+        resourceManager.registerJarResource(Arrays.asList(resourceUri, 
resourceUri));
+
+        // assert resource infos
+        Map<ResourceUri, URL> expected =
+                Collections.singletonMap(
+                        resourceUri, resourceManager.getURLFromPath(new 
Path(udfJar.getPath())));
+
+        assertEquals(expected, resourceManager.getResources());
+
+        // test load class
+        final Class<?> clazz1 = Class.forName(LOWER_UDF_CLASS, false, 
userClassLoader);
+        final Class<?> clazz2 = Class.forName(LOWER_UDF_CLASS, false, 
userClassLoader);
+
+        assertEquals(clazz1, clazz2);
+
+        resourceManager.close();
+    }
+
+    @Test
+    public void testRegisterResourceWithRelativePath() throws Exception {
+        ResourceManager resourceManager =
+                ResourceUtils.createResourceManager(
+                        new URL[0], getClass().getClassLoader(), new 
Configuration());

Review Comment:
   How about moving the creatation of the ResourceManager in the before and 
close this in the after?



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java:
##########
@@ -139,7 +196,9 @@ public void testRegisterInvalidResource() throws Exception {
     @Test
     public void testDownloadResource() throws Exception {

Review Comment:
   Also check whether the downloaded resource has been deleted after closing 
the `ResourceManager`.



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/util/ClientMutableURLClassLoader.java:
##########
@@ -0,0 +1,140 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class loader extends {@link MutableURLClassLoader}, upon the {@code 
addURL} method, it also
+ * exposes a {@code removeURL} method which used to remove unnecessary jar 
from current classloader
+ * path. This class loader wraps a {@link MutableURLClassLoader} and an old 
classloader list, the
+ * class load is delegated to the inner {@link MutableURLClassLoader}.
+ *
+ * <p>This is only used to SqlClient for supporting {@code REMOVE JAR} clause 
currently. When remove
+ * a jar, get the registered jar url list from current {@link 
MutableURLClassLoader} firstly, then
+ * create a new instance of {@link MutableURLClassLoader} which urls doesn't 
include the removed
+ * jar, and the currentClassLoader point to new instance object, the old 
object is added to list to
+ * be closed when close {@link ClientMutableURLClassLoader}.
+ *
+ * <p>Note: This classloader is not guaranteed to actually remove class or 
resource, any classes or
+ * resources in the removed jar that are already loaded, are still accessible.
+ */
+@Experimental
+@Internal
+public class ClientMutableURLClassLoader extends MutableURLClassLoader {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClientMutableURLClassLoader.class);
+
+    static {
+        ClassLoader.registerAsParallelCapable();
+    }
+
+    private final Configuration configuration;
+    private final List<MutableURLClassLoader> oldClassLoaders = new 
ArrayList<>();
+    private MutableURLClassLoader currentClassLoader;
+
+    public ClientMutableURLClassLoader(
+            Configuration configuration, MutableURLClassLoader 
mutableURLClassLoader) {
+        super(new URL[0], mutableURLClassLoader);
+        this.configuration = configuration;

Review Comment:
   It's better we can do a deep copy of the input configuration.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -119,19 +139,44 @@ public Set<URL> getJarResourceURLs() {
                 .collect(Collectors.toSet());

Review Comment:
   Do we still needs this method and it seems duplicate comparing to 
`getJarResourceURLs`? Maybe it is only visible for testing.



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/util/ClientMutableURLClassLoader.java:
##########
@@ -0,0 +1,140 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class loader extends {@link MutableURLClassLoader}, upon the {@code 
addURL} method, it also
+ * exposes a {@code removeURL} method which used to remove unnecessary jar 
from current classloader
+ * path. This class loader wraps a {@link MutableURLClassLoader} and an old 
classloader list, the
+ * class load is delegated to the inner {@link MutableURLClassLoader}.
+ *
+ * <p>This is only used to SqlClient for supporting {@code REMOVE JAR} clause 
currently. When remove
+ * a jar, get the registered jar url list from current {@link 
MutableURLClassLoader} firstly, then
+ * create a new instance of {@link MutableURLClassLoader} which urls doesn't 
include the removed
+ * jar, and the currentClassLoader point to new instance object, the old 
object is added to list to
+ * be closed when close {@link ClientMutableURLClassLoader}.
+ *
+ * <p>Note: This classloader is not guaranteed to actually remove class or 
resource, any classes or
+ * resources in the removed jar that are already loaded, are still accessible.
+ */
+@Experimental
+@Internal
+public class ClientMutableURLClassLoader extends MutableURLClassLoader {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClientMutableURLClassLoader.class);
+
+    static {
+        ClassLoader.registerAsParallelCapable();
+    }
+
+    private final Configuration configuration;
+    private final List<MutableURLClassLoader> oldClassLoaders = new 
ArrayList<>();
+    private MutableURLClassLoader currentClassLoader;
+
+    public ClientMutableURLClassLoader(
+            Configuration configuration, MutableURLClassLoader 
mutableURLClassLoader) {
+        super(new URL[0], mutableURLClassLoader);
+        this.configuration = configuration;
+        this.currentClassLoader = mutableURLClassLoader;
+    }
+
+    @Override
+    protected final Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+        return currentClassLoader.loadClass(name, resolve);
+    }
+
+    @Override
+    public void addURL(URL url) {
+        currentClassLoader.addURL(url);
+    }
+
+    public void removeURL(URL url) {
+        Set<URL> registeredUrls =
+                
Stream.of(currentClassLoader.getURLs()).collect(Collectors.toSet());
+        if (!registeredUrls.contains(url)) {
+            LOG.warn(
+                    String.format(
+                            "Could not remove the specified jar because the 
jar path [%s] is not found in classloader.",
+                            url));
+            return;
+        }
+
+        // add current classloader to list
+        oldClassLoaders.add(currentClassLoader);
+        // remove url from registeredUrls
+        registeredUrls.remove(url);
+        // update current classloader point to a new MutableURLClassLoader 
instance
+        currentClassLoader =
+                MutableURLClassLoader.newInstance(
+                        registeredUrls.toArray(new URL[0]),
+                        currentClassLoader.getParent(),
+                        configuration);
+    }
+
+    @Override
+    public URL[] getURLs() {
+        return currentClassLoader.getURLs();
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOException exception = null;
+        try {
+            // close current classloader
+            currentClassLoader.close();
+        } catch (IOException e) {
+            LOG.debug("Error while closing class loader in 
ClientMutableURLClassLoader.", e);
+            exception = e;
+        }
+
+        // close other classloader in the list
+        for (MutableURLClassLoader classLoader : oldClassLoaders) {
+            try {
+                classLoader.close();
+            } catch (IOException ioe) {
+                LOG.debug(
+                        "Error while closing older class loader in 
ClientMutableURLClassLoader.",
+                        ioe);
+                exception = ExceptionUtils.firstOrSuppressed(ioe, exception);

Review Comment:
   I think we should set the exception if it is null.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -949,4 +954,8 @@ public ResolvedCatalogView resolveCatalogView(CatalogView 
view) {
         final ResolvedSchema resolvedSchema = 
view.getUnresolvedSchema().resolve(schemaResolver);
         return new ResolvedCatalogView(view, resolvedSchema);
     }
+
+    public ClassLoader getUserClassLoader() {
+        return userClassLoader;
+    }

Review Comment:
   Do we need to expose this? I think we should get the classloader from the 
ResourceManager only.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1849,6 +1871,17 @@ private TableResultInternal 
dropSystemFunction(DropTempSystemFunctionOperation o
         }
     }
 
+    private static void mergePipelineJarsToConfig(Set<URL> jarUrls, 
Configuration configuration) {

Review Comment:
   nit: I think we don't need static here.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -119,19 +139,44 @@ public Set<URL> getJarResourceURLs() {
                 .collect(Collectors.toSet());
     }
 
-    private void checkResource(Path path) throws IOException {
-        FileSystem fs = path.getFileSystem();
-        // check resource exists firstly
-        if (!fs.exists(path)) {
-            throw new FileNotFoundException(String.format("Resource [%s] not 
found.", path));
-        }
-
-        // register directory is not allowed for resource
-        if (fs.getFileStatus(path).isDir()) {
+    private void checkJarResource(List<ResourceUri> resourceUris) throws 
IOException {

Review Comment:
   checkJarResource -> checkJarResources.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/utils/ResourceUtils.java:
##########
@@ -0,0 +1,40 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.util.MutableURLClassLoader;
+
+import java.net.URL;
+
+/** Utilities for register resource. */
+public class ResourceUtils {
+
+    /** The tool method to create {@link ResourceManager}. */
+    public static ResourceManager createResourceManager(
+            URL[] urls, ClassLoader parent, Configuration configuration) {
+        MutableURLClassLoader mutableURLClassLoader =
+                MutableURLClassLoader.newInstance(urls, parent, configuration);
+        return new ResourceManager(configuration, mutableURLClassLoader);
+    }

Review Comment:
   Why not add a newInstance method for ResourceManager itself?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -69,43 +74,58 @@ public URLClassLoader getUserClassLoader() {
         return userClassLoader;
     }
 
-    public void registerResource(ResourceUri resourceUri) throws IOException {
-        // check whether the resource has been registered
-        if (resourceInfos.containsKey(resourceUri)) {
-            LOG.info(
-                    "Resource [{}] has been registered, overwriting of 
registered resource is not supported "
-                            + "in the current version, skipping.",
-                    resourceUri.getUri());
-            return;
-        }
+    /**
+     * Due to anyone of the resource in list maybe fail during register, so we 
should stage it
+     * before actual register to guarantee transaction process. If all the 
resources are avaliable,
+     * register them into the {@link ResourceManager}.
+     */
+    public void registerJarResource(List<ResourceUri> resourceUris) throws 
IOException {
+        // check jar resource before register
+        checkJarResource(resourceUris);
 
-        // here can check whether the resource path is valid
-        Path path = new Path(resourceUri.getUri());
-        // check resource firstly
-        checkResource(path);
-
-        URL localUrl;
-        // check resource scheme
-        String scheme = StringUtils.lowerCase(path.toUri().getScheme());
-        // download resource to local path firstly if in remote
-        if (scheme != null && !"file".equals(scheme)) {
-            localUrl = downloadResource(path);
-        } else {
-            localUrl = getURLFromPath(path);
-        }
+        Map<ResourceUri, URL> stagingResourceLocalURLs = new HashMap<>();
+        for (ResourceUri resourceUri : resourceUris) {
+            // check whether the resource has been registered
+            if (resourceInfos.containsKey(resourceUri) && 
resourceInfos.get(resourceUri) != null) {
+                LOG.info(
+                        "Resource [{}] has been registered, overwriting of 
registered resource is not supported "
+                                + "in the current version, skipping.",
+                        resourceUri.getUri());
+                continue;
+            }
+
+            // here can check whether the resource path is valid
+            Path path = new Path(resourceUri.getUri());
+            URL localUrl;
+            // check resource scheme
+            String scheme = StringUtils.lowerCase(path.toUri().getScheme());
+            // download resource to local path firstly if in remote
+            if (scheme != null && !FILE_SCHEME.equals(scheme)) {
+                localUrl = downloadResource(path);
+            } else {
+                localUrl = getURLFromPath(path);
+                // if the local jar resource is a relative path, here convert 
it to absolute path
+                // before register
+                resourceUri = new ResourceUri(ResourceType.JAR, 
localUrl.getPath());
+            }
 
-        // only need add jar resource to classloader
-        if (ResourceType.JAR.equals(resourceUri.getResourceType())) {
-            // check the Jar file firstly
+            // check the local jar file extra

Review Comment:
   extra? 



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java:
##########
@@ -119,19 +139,44 @@ public Set<URL> getJarResourceURLs() {
                 .collect(Collectors.toSet());
     }
 
-    private void checkResource(Path path) throws IOException {
-        FileSystem fs = path.getFileSystem();
-        // check resource exists firstly
-        if (!fs.exists(path)) {
-            throw new FileNotFoundException(String.format("Resource [%s] not 
found.", path));
-        }
-
-        // register directory is not allowed for resource
-        if (fs.getFileStatus(path).isDir()) {
+    private void checkJarResource(List<ResourceUri> resourceUris) throws 
IOException {
+        // only support register jar resource
+        if (resourceUris.stream()
+                .anyMatch(resourceUri -> 
!ResourceType.JAR.equals(resourceUri.getResourceType()))) {
             throw new IOException(
                     String.format(
-                            "The resource [%s] is a directory, however, the 
directory is not allowed for registering resource.",
-                            path));
+                            "Only support to register jar resource, resource 
info:\n %s.",
+                            resourceUris.stream()
+                                    .map(ResourceUri::getUri)
+                                    .collect(Collectors.joining(",\n"))));
+        }
+
+        for (ResourceUri resourceUri : resourceUris) {
+            // here can check whether the resource path is valid
+            Path path = new Path(resourceUri.getUri());
+            // file name should end with .jar suffix
+            String fileExtension = Files.getFileExtension(path.getName());
+            if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
+                throw new IOException(

Review Comment:
   IOException is not suitable? How about ValidationException?



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java:
##########
@@ -106,30 +145,48 @@ public void testRegisterResource() throws Exception {
 
     @Test
     public void testRegisterInvalidResource() throws Exception {
-        ResourceManager resourceManager = createResourceManager(new URL[0]);
+        ResourceManager resourceManager =
+                ResourceUtils.createResourceManager(
+                        new URL[0], getClass().getClassLoader(), new 
Configuration());
 
         // test register non-exist file
-        final String fileUri =
-                temporaryFolder.getRoot().getPath() + Path.SEPARATOR + 
"test-non-exist-file";
+        final String fileUri = temporaryFolder.getRoot().getPath() + 
Path.SEPARATOR + "test-file";
+
+        CommonTestUtils.assertThrows(
+                String.format(
+                        "Only support to register jar resource, resource 
info:\n" + " %s.",

Review Comment:
                           "Only support to register jar resource, resource 
info:\n %s.",
   



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java:
##########
@@ -83,9 +82,49 @@ public void testRegisterResource() throws Exception {
                 ClassNotFoundException.class,
                 () -> Class.forName(LOWER_UDF_CLASS, false, userClassLoader));
 
+        ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, 
udfJar.getPath());
         // register the same jar repeatedly
-        resourceManager.registerResource(new ResourceUri(ResourceType.JAR, 
udfJar.getPath()));
-        resourceManager.registerResource(new ResourceUri(ResourceType.JAR, 
udfJar.getPath()));
+        resourceManager.registerJarResource(Arrays.asList(resourceUri, 
resourceUri));
+
+        // assert resource infos
+        Map<ResourceUri, URL> expected =
+                Collections.singletonMap(
+                        resourceUri, resourceManager.getURLFromPath(new 
Path(udfJar.getPath())));
+
+        assertEquals(expected, resourceManager.getResources());
+
+        // test load class
+        final Class<?> clazz1 = Class.forName(LOWER_UDF_CLASS, false, 
userClassLoader);
+        final Class<?> clazz2 = Class.forName(LOWER_UDF_CLASS, false, 
userClassLoader);
+
+        assertEquals(clazz1, clazz2);
+
+        resourceManager.close();
+    }
+
+    @Test
+    public void testRegisterResourceWithRelativePath() throws Exception {
+        ResourceManager resourceManager =
+                ResourceUtils.createResourceManager(
+                        new URL[0], getClass().getClassLoader(), new 
Configuration());
+        URLClassLoader userClassLoader = resourceManager.getUserClassLoader();
+
+        // test class loading before register resource
+        CommonTestUtils.assertThrows(
+                String.format("LowerUDF"),
+                ClassNotFoundException.class,
+                () -> Class.forName(LOWER_UDF_CLASS, false, userClassLoader));
+
+        ResourceUri resourceUri =
+                new ResourceUri(
+                        ResourceType.JAR,
+                        new File(".")
+                                .getCanonicalFile()
+                                .toPath()
+                                .relativize(udfJar.toPath())
+                                .toString());
+        // register the same jar repeatedly

Review Comment:
   The comment is wrong? It's the first time to register the resource.



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java:
##########
@@ -83,9 +82,49 @@ public void testRegisterResource() throws Exception {
                 ClassNotFoundException.class,
                 () -> Class.forName(LOWER_UDF_CLASS, false, userClassLoader));
 
+        ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, 
udfJar.getPath());
         // register the same jar repeatedly
-        resourceManager.registerResource(new ResourceUri(ResourceType.JAR, 
udfJar.getPath()));
-        resourceManager.registerResource(new ResourceUri(ResourceType.JAR, 
udfJar.getPath()));
+        resourceManager.registerJarResource(Arrays.asList(resourceUri, 
resourceUri));
+
+        // assert resource infos
+        Map<ResourceUri, URL> expected =
+                Collections.singletonMap(
+                        resourceUri, resourceManager.getURLFromPath(new 
Path(udfJar.getPath())));
+
+        assertEquals(expected, resourceManager.getResources());
+
+        // test load class
+        final Class<?> clazz1 = Class.forName(LOWER_UDF_CLASS, false, 
userClassLoader);
+        final Class<?> clazz2 = Class.forName(LOWER_UDF_CLASS, false, 
userClassLoader);
+
+        assertEquals(clazz1, clazz2);
+
+        resourceManager.close();
+    }
+
+    @Test
+    public void testRegisterResourceWithRelativePath() throws Exception {
+        ResourceManager resourceManager =
+                ResourceUtils.createResourceManager(
+                        new URL[0], getClass().getClassLoader(), new 
Configuration());
+        URLClassLoader userClassLoader = resourceManager.getUserClassLoader();
+
+        // test class loading before register resource
+        CommonTestUtils.assertThrows(
+                String.format("LowerUDF"),

Review Comment:
   No need for `String.format`.



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java:
##########
@@ -83,9 +82,49 @@ public void testRegisterResource() throws Exception {
                 ClassNotFoundException.class,
                 () -> Class.forName(LOWER_UDF_CLASS, false, userClassLoader));
 
+        ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, 
udfJar.getPath());
         // register the same jar repeatedly
-        resourceManager.registerResource(new ResourceUri(ResourceType.JAR, 
udfJar.getPath()));
-        resourceManager.registerResource(new ResourceUri(ResourceType.JAR, 
udfJar.getPath()));
+        resourceManager.registerJarResource(Arrays.asList(resourceUri, 
resourceUri));
+
+        // assert resource infos
+        Map<ResourceUri, URL> expected =
+                Collections.singletonMap(
+                        resourceUri, resourceManager.getURLFromPath(new 
Path(udfJar.getPath())));
+
+        assertEquals(expected, resourceManager.getResources());
+
+        // test load class
+        final Class<?> clazz1 = Class.forName(LOWER_UDF_CLASS, false, 
userClassLoader);
+        final Class<?> clazz2 = Class.forName(LOWER_UDF_CLASS, false, 
userClassLoader);

Review Comment:
   The codes are the same. What is the the motivation of the test?



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java:
##########
@@ -0,0 +1,49 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.client.resource;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
+import org.apache.flink.util.MutableURLClassLoader;
+
+import java.net.URL;
+
+/**
+ * This is only used by SqlClient, which expose {@code removeURL} method to 
support {@code REMOVE
+ * JAR} clause.
+ */

Review Comment:
   
   ```
   /**
    * The {@link ClientResourceManager} is able to remove the registered JAR 
resources with the
    * specified jar path.
    *
    * <p>After removing the JAR resource, the {@link ResourceManager} is able 
to register the JAR
    * resource with the same JAR path. Please notice that the removal doesn't 
promise the loaded {@link
    * Class} from the removed jar is inaccessible.
    */
   
   ```



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java:
##########
@@ -0,0 +1,49 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.client.resource;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
+import org.apache.flink.util.MutableURLClassLoader;
+
+import java.net.URL;
+
+/**
+ * This is only used by SqlClient, which expose {@code removeURL} method to 
support {@code REMOVE
+ * JAR} clause.
+ */
+@Internal
+public class ClientResourceManager extends ResourceManager {
+
+    public ClientResourceManager(Configuration config, MutableURLClassLoader 
userClassLoader) {
+        super(config, userClassLoader);
+    }
+
+    /**
+     * The method is only used to SqlClient for supporting remove jar syntax. 
SqlClient must
+     * guarantee also remove the jar from userClassLoader because it is {@code
+     * ClientMutableURLClassLoader}.
+     */
+    public URL unregisterJarResource(String jarPath) {

Review Comment:
   add `@Nullable`



##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/util/ClientMutableURLClassLoaderTest.java:
##########
@@ -0,0 +1,145 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.client.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.ClientMutableURLClassLoader;
+import org.apache.flink.util.MutableURLClassLoader;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CLASS;
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CODE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for classloading and class loader utilities. */
+public class ClientMutableURLClassLoaderTest {
+
+    @TempDir private static File tempDir;
+
+    private static File userJar;
+
+    @BeforeAll
+    public static void prepare() throws Exception {
+        Map<String, String> classNameCodes = new HashMap<>();
+        classNameCodes.put(GENERATED_LOWER_UDF_CLASS, 
GENERATED_LOWER_UDF_CODE);
+        classNameCodes.put(GENERATED_UPPER_UDF_CLASS, 
GENERATED_UPPER_UDF_CODE);
+        userJar =
+                UserClassLoaderJarTestUtils.createJarFile(
+                        tempDir, "test-classloader.jar", classNameCodes);
+    }
+
+    @Test
+    public void testClassLoadingByAddURL() throws Exception {
+        Configuration configuration = new Configuration();
+        final ClientMutableURLClassLoader classLoader =
+                new ClientMutableURLClassLoader(
+                        configuration,
+                        MutableURLClassLoader.newInstance(
+                                new URL[0], getClass().getClassLoader(), 
configuration));
+
+        // test class loader before add jar url to ClassLoader
+        assertClassNotFoundException(GENERATED_LOWER_UDF_CLASS, false, 
classLoader);
+
+        // add jar url to ClassLoader
+        classLoader.addURL(userJar.toURI().toURL());
+
+        assertTrue(classLoader.getURLs().length == 1);
+
+        final Class<?> clazz1 = Class.forName(GENERATED_LOWER_UDF_CLASS, 
false, classLoader);
+        final Class<?> clazz2 = Class.forName(GENERATED_LOWER_UDF_CLASS, 
false, classLoader);
+
+        assertEquals(clazz1, clazz2);

Review Comment:
   I think we can load from GENERATED_LOWER_UDF_CLASS is enough for the test.



##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/util/ClientMutableURLClassLoaderTest.java:
##########
@@ -0,0 +1,145 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.client.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.ClientMutableURLClassLoader;
+import org.apache.flink.util.MutableURLClassLoader;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CLASS;
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CODE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for classloading and class loader utilities. */
+public class ClientMutableURLClassLoaderTest {
+
+    @TempDir private static File tempDir;
+
+    private static File userJar;
+
+    @BeforeAll
+    public static void prepare() throws Exception {
+        Map<String, String> classNameCodes = new HashMap<>();
+        classNameCodes.put(GENERATED_LOWER_UDF_CLASS, 
GENERATED_LOWER_UDF_CODE);
+        classNameCodes.put(GENERATED_UPPER_UDF_CLASS, 
GENERATED_UPPER_UDF_CODE);
+        userJar =
+                UserClassLoaderJarTestUtils.createJarFile(
+                        tempDir, "test-classloader.jar", classNameCodes);
+    }
+
+    @Test
+    public void testClassLoadingByAddURL() throws Exception {
+        Configuration configuration = new Configuration();
+        final ClientMutableURLClassLoader classLoader =
+                new ClientMutableURLClassLoader(
+                        configuration,
+                        MutableURLClassLoader.newInstance(
+                                new URL[0], getClass().getClassLoader(), 
configuration));
+
+        // test class loader before add jar url to ClassLoader
+        assertClassNotFoundException(GENERATED_LOWER_UDF_CLASS, false, 
classLoader);
+
+        // add jar url to ClassLoader
+        classLoader.addURL(userJar.toURI().toURL());
+
+        assertTrue(classLoader.getURLs().length == 1);
+
+        final Class<?> clazz1 = Class.forName(GENERATED_LOWER_UDF_CLASS, 
false, classLoader);
+        final Class<?> clazz2 = Class.forName(GENERATED_LOWER_UDF_CLASS, 
false, classLoader);
+
+        assertEquals(clazz1, clazz2);
+
+        classLoader.close();
+    }
+
+    @Test
+    public void testClassLoadingByRemoveURL() throws Exception {
+        URL jarURL = userJar.toURI().toURL();
+        Configuration configuration = new Configuration();
+        final ClientMutableURLClassLoader classLoader =
+                new ClientMutableURLClassLoader(
+                        configuration,
+                        MutableURLClassLoader.newInstance(
+                                new URL[] {jarURL}, 
getClass().getClassLoader(), configuration));
+
+        final Class<?> clazz1 = Class.forName(GENERATED_LOWER_UDF_CLASS, 
false, classLoader);
+        final Class<?> clazz2 = Class.forName(GENERATED_LOWER_UDF_CLASS, 
false, classLoader);
+        assertEquals(clazz1, clazz2);
+
+        // remove jar url
+        classLoader.removeURL(jarURL);
+
+        assertTrue(classLoader.getURLs().length == 0);
+
+        // test class loader after remove jar url from ClassLoader
+        assertClassNotFoundException(GENERATED_UPPER_UDF_CLASS, false, 
classLoader);
+
+        // add jar url to ClassLoader again
+        classLoader.addURL(jarURL);
+
+        assertTrue(classLoader.getURLs().length == 1);
+
+        final Class<?> clazz3 = Class.forName(GENERATED_UPPER_UDF_CLASS, 
false, classLoader);
+        final Class<?> clazz4 = Class.forName(GENERATED_UPPER_UDF_CLASS, 
false, classLoader);
+        assertEquals(clazz3, clazz4);
+
+        classLoader.close();
+    }
+
+    @Test
+    public void testParallelCapable() {
+        // It will be true only if all the super classes (except class Object) 
of the caller are
+        // registered as parallel capable.
+        assertTrue(TestClientMutableURLClassLoader.isParallelCapable);
+    }
+
+    private void assertClassNotFoundException(
+            String className, boolean initialize, ClassLoader classLoader) {
+        CommonTestUtils.assertThrows(
+                className,
+                ClassNotFoundException.class,
+                () -> Class.forName(className, initialize, classLoader));
+    }
+
+    private static class TestClientMutableURLClassLoader extends 
ClientMutableURLClassLoader {
+        public static boolean isParallelCapable;
+
+        static {
+            isParallelCapable = ClassLoader.registerAsParallelCapable();
+        }

Review Comment:
   Why not using:
   
   ```
           public static boolean isParallelCapable = 
ClassLoader.registerAsParallelCapable();
   ```



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java:
##########
@@ -0,0 +1,49 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.client.resource;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
+import org.apache.flink.util.MutableURLClassLoader;
+
+import java.net.URL;
+
+/**
+ * This is only used by SqlClient, which expose {@code removeURL} method to 
support {@code REMOVE
+ * JAR} clause.
+ */
+@Internal
+public class ClientResourceManager extends ResourceManager {
+
+    public ClientResourceManager(Configuration config, MutableURLClassLoader 
userClassLoader) {
+        super(config, userClassLoader);
+    }
+
+    /**
+     * The method is only used to SqlClient for supporting remove jar syntax. 
SqlClient must
+     * guarantee also remove the jar from userClassLoader because it is {@code
+     * ClientMutableURLClassLoader}.
+     */
+    public URL unregisterJarResource(String jarPath) {

Review Comment:
   Remove the java doc. Actually we can't promise others will not use this.



##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/util/ClientMutableURLClassLoaderTest.java:
##########
@@ -0,0 +1,145 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.client.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.ClientMutableURLClassLoader;
+import org.apache.flink.util.MutableURLClassLoader;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CLASS;
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CODE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for classloading and class loader utilities. */
+public class ClientMutableURLClassLoaderTest {
+
+    @TempDir private static File tempDir;
+
+    private static File userJar;
+
+    @BeforeAll
+    public static void prepare() throws Exception {

Review Comment:
   
   ```
       public static void prepare(@TempDir File tempDir) 
   
   ```



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java:
##########
@@ -235,69 +231,58 @@ public static SessionContext create(DefaultContext 
defaultContext, String sessio
                                         settings.getBuiltInDatabaseName()))
                         .build();
 
-        FunctionCatalog functionCatalog =
-                new FunctionCatalog(configuration, catalogManager, 
moduleManager, classLoader);
-        SessionState sessionState =
-                new SessionState(catalogManager, moduleManager, 
functionCatalog);
+        final FunctionCatalog functionCatalog =
+                new FunctionCatalog(configuration, resourceManager, 
catalogManager, moduleManager);
+        final SessionState sessionState =
+                new SessionState(catalogManager, moduleManager, 
resourceManager, functionCatalog);
 
         // 
--------------------------------------------------------------------------------------------------------------
         // Init ExecutionContext
         // 
--------------------------------------------------------------------------------------------------------------
 
         ExecutionContext executionContext =
-                new ExecutionContext(configuration, classLoader, sessionState);
+                new ExecutionContext(configuration, userClassLoader, 
sessionState);
 
         return new SessionContext(
                 defaultContext,
                 sessionId,
                 configuration,
-                classLoader,
+                userClassLoader,
                 sessionState,
                 executionContext);
     }
 
     public void addJar(String jarPath) {
-        URL jarURL = getURLFromPath(jarPath, "SQL Client only supports to add 
local jars.");
-        if (dependencies.contains(jarURL)) {
-            return;
+        checkJarPath(jarPath, "SQL Client only supports to add local jars.");
+        try {
+            sessionState.resourceManager.registerJarResource(
+                    Collections.singletonList(new 
ResourceUri(ResourceType.JAR, jarPath)));
+        } catch (IOException e) {
+            LOG.warn(String.format("Could not register the specified jar 
[%s].", jarPath), e);
         }
-
-        // merge the jars in config with the jars maintained in session
-        Set<URL> jarsInConfig = getJarsInConfig();
-
-        Set<URL> newDependencies = new HashSet<>(dependencies);
-        newDependencies.addAll(jarsInConfig);
-        newDependencies.add(jarURL);
-        updateClassLoaderAndDependencies(newDependencies);
-
-        // renew the execution context
-        executionContext = new ExecutionContext(sessionConfiguration, 
classLoader, sessionState);
     }
 
     public void removeJar(String jarPath) {
-        URL jarURL = getURLFromPath(jarPath, "SQL Client only supports to 
remove local jars.");
-        if (!dependencies.contains(jarURL)) {
+        // if is relative path, convert to absolute path
+        URL jarURL = checkJarPath(jarPath, "SQL Client only supports to remove 
local jars.");
+        // remove jar from resource manager
+        jarURL = 
sessionState.resourceManager.unregisterJarResource(jarURL.getPath());
+        if (jarURL == null) {
             LOG.warn(
                     String.format(
-                            "Could not remove the specified jar because the 
jar path(%s) is not found in session classloader.",
+                            "Could not remove the specified jar because the 
jar path [%s] hadn't registered to classloader.",
                             jarPath));
             return;
         }
-
-        Set<URL> newDependencies = new HashSet<>(dependencies);
-        // merge the jars in config with the jars maintained in session
-        Set<URL> jarsInConfig = getJarsInConfig();
-        newDependencies.addAll(jarsInConfig);
-        newDependencies.remove(jarURL);
-
-        updateClassLoaderAndDependencies(newDependencies);
-
-        // renew the execution context
-        executionContext = new ExecutionContext(sessionConfiguration, 
classLoader, sessionState);
+        // remove jar from classloader
+        classLoader.removeURL(jarURL);
     }
 
     public List<String> listJars() {
-        return 
dependencies.stream().map(URL::getPath).collect(Collectors.toList());
+        return sessionState.resourceManager.getResources().keySet().stream()

Review Comment:
   use `ResourceManager#getJarResourceURLs`



##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/util/ClientMutableURLClassLoaderTest.java:
##########
@@ -0,0 +1,145 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.client.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.ClientMutableURLClassLoader;
+import org.apache.flink.util.MutableURLClassLoader;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CLASS;
+import static 
org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CODE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for classloading and class loader utilities. */
+public class ClientMutableURLClassLoaderTest {
+
+    @TempDir private static File tempDir;
+
+    private static File userJar;
+
+    @BeforeAll
+    public static void prepare() throws Exception {
+        Map<String, String> classNameCodes = new HashMap<>();
+        classNameCodes.put(GENERATED_LOWER_UDF_CLASS, 
GENERATED_LOWER_UDF_CODE);
+        classNameCodes.put(GENERATED_UPPER_UDF_CLASS, 
GENERATED_UPPER_UDF_CODE);
+        userJar =
+                UserClassLoaderJarTestUtils.createJarFile(
+                        tempDir, "test-classloader.jar", classNameCodes);
+    }
+
+    @Test
+    public void testClassLoadingByAddURL() throws Exception {
+        Configuration configuration = new Configuration();
+        final ClientMutableURLClassLoader classLoader =
+                new ClientMutableURLClassLoader(
+                        configuration,
+                        MutableURLClassLoader.newInstance(
+                                new URL[0], getClass().getClassLoader(), 
configuration));
+
+        // test class loader before add jar url to ClassLoader
+        assertClassNotFoundException(GENERATED_LOWER_UDF_CLASS, false, 
classLoader);
+
+        // add jar url to ClassLoader
+        classLoader.addURL(userJar.toURI().toURL());
+
+        assertTrue(classLoader.getURLs().length == 1);

Review Comment:
   use assertEquals to eliminate the warnings.



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java:
##########
@@ -0,0 +1,49 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.client.resource;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
+import org.apache.flink.util.MutableURLClassLoader;
+
+import java.net.URL;
+
+/**
+ * This is only used by SqlClient, which expose {@code removeURL} method to 
support {@code REMOVE
+ * JAR} clause.
+ */
+@Internal
+public class ClientResourceManager extends ResourceManager {
+
+    public ClientResourceManager(Configuration config, MutableURLClassLoader 
userClassLoader) {
+        super(config, userClassLoader);
+    }
+
+    /**
+     * The method is only used to SqlClient for supporting remove jar syntax. 
SqlClient must
+     * guarantee also remove the jar from userClassLoader because it is {@code
+     * ClientMutableURLClassLoader}.
+     */
+    public URL unregisterJarResource(String jarPath) {
+        return resourceInfos.remove(new ResourceUri(ResourceType.JAR, 
jarPath));

Review Comment:
   I think we should quailfied jar path here. It might be relataive path.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java:
##########
@@ -0,0 +1,185 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.Table;
+import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.table.resource.ResourceManagerTest;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for catalog and system functions in a table environment. */
+public class FunctionITCase extends BatchTestBase {
+
+    private static String jarPath;
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        File jarFile =
+                UserClassLoaderJarTestUtils.createJarFile(
+                        TEMPORARY_FOLDER.newFolder("test-jar"),
+                        "test-classloader-udf.jar",
+                        ResourceManagerTest.LOWER_UDF_CLASS,
+                        ResourceManagerTest.LOWER_UDF_CODE);
+        jarPath = jarFile.toURI().toString();
+    }
+
+    @Before
+    @Override
+    public void before() {
+        // override TableEnvironment for every test to clear register jar in 
ResourceManager first
+        overrideTableEnv(null);

Review Comment:
   I think it's better we can create jar with different class name per test.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala:
##########
@@ -72,6 +72,17 @@ class BatchTestBase extends BatchAbstractTestBase {
     "(?s)From line ([0-9]+),"
       + " column ([0-9]+) to line ([0-9]+), column ([0-9]+): (.*)")
 
+  def overrideTableEnv(classLoader: ClassLoader): Unit = {
+    settings = 
EnvironmentSettings.newInstance().inBatchMode().withClassLoader(classLoader).build()

Review Comment:
   It seems withClassloader's input is nullable. I think we can add the 
annotation.



##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java:
##########
@@ -29,16 +29,25 @@
 /** A bunch of UDFs for testing the SQL Client. */
 public class UserDefinedFunctions {

Review Comment:
   The class seems also used in `ResourceManagerTest`. How about move this 
useful utilites to flink-table-api-java package?



##########
flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java:
##########
@@ -31,10 +35,38 @@ public abstract class MutableURLClassLoader extends 
URLClassLoader {
         ClassLoader.registerAsParallelCapable();
     }
 
+    /**
+     * Creates a new instance of MutableURLClassLoader subclass for the 
specified URLs, parent class
+     * loader and configuration.
+     */
+    public static MutableURLClassLoader newInstance(
+            final URL[] urls, final ClassLoader parent, final Configuration 
configuration) {
+        final String[] alwaysParentFirstLoaderPatterns =
+                CoreOptions.getParentFirstLoaderPatterns(configuration);
+        final String classLoaderResolveOrder =
+                configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
+        final FlinkUserCodeClassLoaders.ResolveOrder resolveOrder =
+                
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder);
+        final boolean checkClassloaderLeak =
+                configuration.getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER);
+        return FlinkUserCodeClassLoaders.create(
+                resolveOrder,
+                urls,
+                parent,
+                alwaysParentFirstLoaderPatterns,
+                NOOP_EXCEPTION_HANDLER,
+                checkClassloaderLeak);
+    }
+
     public MutableURLClassLoader(URL[] urls, ClassLoader parent) {
         super(urls, parent);
     }
 
+    @Override
+    protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+        return super.loadClass(name, resolve);
+    }
+

Review Comment:
   emm. I think the child class can still call it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to