zhuzhurk commented on code in PR #19860:
URL: https://github.com/apache/flink/pull/19860#discussion_r892164060


##########
flink-core/src/main/java/org/apache/flink/core/classloading/MutableURLClassLoader.java:
##########
@@ -0,0 +1,152 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.core.classloading;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.util.FlinkUserCodeClassLoader;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
+
+/**
+ * This classloader delegates to the actual user classloader {@link 
FlinkUserCodeClassLoader}. Upon
+ * the existed function of {@link FlinkUserCodeClassLoader}, it also provides 
extra {@link
+ * #addURL(URL)} and {@link #removeURL(URL)} methods, this allows user to add 
and remove jar
+ * dynamically.
+ *
+ * <p>User can specify the class load order by set the parameter 
(classloader.resolve-order) in
+ * {@link Configuration}, default order is child-first.
+ */
+public class MutableURLClassLoader extends URLClassLoader {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MutableURLClassLoader.class);
+
+    private final Set<URL> registeredUrls;
+    private final Configuration config;
+    private final ClassLoader parentClassLoader;
+    private volatile FlinkUserCodeClassLoader ownerClassLoader;
+
+    public MutableURLClassLoader(URL[] urls, ClassLoader parent, Configuration 
config) {
+        super(new URL[0], parent);
+        this.registeredUrls = new HashSet<>(Arrays.asList(urls));
+        this.config = config;
+        this.parentClassLoader = parent;
+        this.ownerClassLoader = buildUserCodeClassLoader();
+    }
+
+    private FlinkUserCodeClassLoader buildUserCodeClassLoader() {
+        final String[] alwaysParentFirstLoaderPatterns =
+                CoreOptions.getParentFirstLoaderPatterns(config);
+
+        // According to the specified class load order,child-first or 
parent-first
+        // child-first: load from this classloader firstly, if not found, then 
from parent
+        // parent-first: load from parent firstly, if not found, then from 
this classloader
+        final String classLoaderResolveOrder =
+                config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
+        FlinkUserCodeClassLoaders.ResolveOrder resolveOrder =
+                
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder);
+        return (FlinkUserCodeClassLoader)
+                FlinkUserCodeClassLoaders.create(
+                        resolveOrder,
+                        registeredUrls.toArray(new URL[0]),
+                        parentClassLoader,
+                        alwaysParentFirstLoaderPatterns,
+                        NOOP_EXCEPTION_HANDLER,
+                        false);
+    }
+
+    @Override
+    public Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+        try {
+            return ownerClassLoader.loadClass(name, resolve);
+        } catch (ClassNotFoundException cnf) {
+            throw cnf;
+        }
+    }
+
+    @Override
+    public void addURL(URL url) {
+        // add url to registeredUrls
+        registeredUrls.add(url);
+        // add it to ownerClassLoader
+        ownerClassLoader.addURL(url);
+    }
+
+    /**
+     * Here the old ownerClassLoader can't be close directly because of java 
class loader's
+     * all-inclusive responsibility delegation mechanism. The mechanism is 
that when a ClassLoder
+     * loads a class, unless another ClassLoder is specified explicitly, the 
classes that the class
+     * depends on and references are also loaded by the ClassLoder. Hence, we 
can't close the old
+     * ownerClassLoader when remove url, it may cause a {@link 
ClassNotFoundException} exception.
+     *
+     * <p>If call this method frequently, it maybe results in a classloader 
leak. Please use this

Review Comment:
   I'm a bit concerned about the side effect. And I do not fully understand the 
benefit of the removal. 
   In my understanding, it is only for table environment to support multiple 
different jobs submission from SQL client/gateway and avoid jar conflicts. If 
so, I would suggest to move the class to table module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to