fsk119 commented on code in PR #20001: URL: https://github.com/apache/flink/pull/20001#discussion_r914362571
########## flink-table/flink-sql-client/src/main/java/org/apache/flink/util/ClientMutableURLClassLoader.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.configuration.Configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class loader extands {@link MutableURLClassLoader}, upon the {@code addURL} method, it also + * exposes a {@code removeURL} method which used to remove unnecessary jar from current classloader + * path. This class loader wraps a {@link MutableURLClassLoader} and an old classloader list, the + * class load is delegated to the inner {@link MutableURLClassLoader}. + * + * <p>This is only used to SqlClient for supporting {@code REMOVE JAR} clause currently. When remove + * a jar, get the registered jar url list from current {@link MutableURLClassLoader} firstly, then + * create a new instance of {@link MutableURLClassLoader} which urls doesn't include the removed + * jar, and the currentClassLoader point to new instance object, the old object is added to list to + * be closed when close {@link ClientMutableURLClassLoader}. + * + * <p>Note: This classloader is not guaranteed to actually remove class or resource, any classes or + * resources in the removed jar that are already loaded, are still accessible. + */ +public class ClientMutableURLClassLoader extends MutableURLClassLoader { + + private static final Logger LOG = LoggerFactory.getLogger(ClientMutableURLClassLoader.class); + + static { + ClassLoader.registerAsParallelCapable(); + } + + private final Configuration configuration; Review Comment: The configuraion seems useless. Can we remove this? ########## flink-table/flink-sql-client/src/main/java/org/apache/flink/util/ClientMutableURLClassLoader.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.configuration.Configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class loader extands {@link MutableURLClassLoader}, upon the {@code addURL} method, it also + * exposes a {@code removeURL} method which used to remove unnecessary jar from current classloader + * path. This class loader wraps a {@link MutableURLClassLoader} and an old classloader list, the + * class load is delegated to the inner {@link MutableURLClassLoader}. + * + * <p>This is only used to SqlClient for supporting {@code REMOVE JAR} clause currently. When remove + * a jar, get the registered jar url list from current {@link MutableURLClassLoader} firstly, then + * create a new instance of {@link MutableURLClassLoader} which urls doesn't include the removed + * jar, and the currentClassLoader point to new instance object, the old object is added to list to + * be closed when close {@link ClientMutableURLClassLoader}. + * + * <p>Note: This classloader is not guaranteed to actually remove class or resource, any classes or + * resources in the removed jar that are already loaded, are still accessible. + */ +public class ClientMutableURLClassLoader extends MutableURLClassLoader { + + private static final Logger LOG = LoggerFactory.getLogger(ClientMutableURLClassLoader.class); + + static { + ClassLoader.registerAsParallelCapable(); + } + + private final Configuration configuration; + private final List<MutableURLClassLoader> oldClassLoaders = new ArrayList<>(); + private MutableURLClassLoader currentClassLoader; + + public ClientMutableURLClassLoader( + Configuration configuration, MutableURLClassLoader mutableURLClassLoader) { + super(new URL[0], mutableURLClassLoader); + this.configuration = configuration; + this.currentClassLoader = mutableURLClassLoader; + } + + @Override + protected final Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + return currentClassLoader.loadClass(name, resolve); + } + + @Override + public void addURL(URL url) { + currentClassLoader.addURL(url); + } + + public void removeURL(URL url) { + Set<URL> registeredUrls = + Stream.of(currentClassLoader.getURLs()).collect(Collectors.toSet()); + if (!registeredUrls.contains(url)) { + LOG.warn( + String.format( + "Could not remove the specified jar because the jar path [%s] is not found in classloader.", + url)); + return; + } + + // add current classloader to list + oldClassLoaders.add(currentClassLoader); + // remove url from registeredUrls + registeredUrls.remove(url); + // update current classloader point to a new MutableURLClassLoader instance + currentClassLoader = + MutableURLClassLoader.newInstance( + registeredUrls.toArray(new URL[0]), + currentClassLoader.getParent(), + configuration); + } + + @Override + public URL[] getURLs() { + return currentClassLoader.getURLs(); + } + + @Override + public void close() throws IOException { + // close current classloader + currentClassLoader.close(); + // close other classloader in the list + for (MutableURLClassLoader classLoader : oldClassLoaders) { + classLoader.close(); + } Review Comment: It's better we can catch the exception and rethrow the catched exception when all inner classloaders are closed. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ########## @@ -69,43 +70,67 @@ public URLClassLoader getUserClassLoader() { return userClassLoader; } - public void registerResource(ResourceUri resourceUri) throws IOException { - // check whether the resource has been registered - if (resourceInfos.containsKey(resourceUri)) { - LOG.info( - "Resource [{}] has been registered, overwriting of registered resource is not supported " - + "in the current version, skipping.", - resourceUri.getUri()); - return; - } - - // here can check whether the resource path is valid - Path path = new Path(resourceUri.getUri()); - // check resource firstly - checkResource(path); - - URL localUrl; - // check resource scheme - String scheme = StringUtils.lowerCase(path.toUri().getScheme()); - // download resource to local path firstly if in remote - if (scheme != null && !"file".equals(scheme)) { - localUrl = downloadResource(path); - } else { - localUrl = getURLFromPath(path); + public void registerResource(List<ResourceUri> resourceUris) throws IOException { + // Due to anyone of the resource in list maybe fail during register, so we should stage it Review Comment: Convert to java doc ########## flink-table/flink-sql-client/src/test/java/org/apache/flink/util/ClientMutableURLClassLoaderTest.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS; +import static org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE; +import static org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CLASS; +import static org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CODE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Tests for classloading and class loader utilities. */ +public class ClientMutableURLClassLoaderTest { + + @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); Review Comment: it's better to use juni5 api. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/utils/ResourceUtils.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.configuration.ConfigUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.table.resource.ResourceManager; +import org.apache.flink.table.resource.ResourceType; +import org.apache.flink.table.resource.ResourceUri; +import org.apache.flink.util.MutableURLClassLoader; + +import java.io.IOException; +import java.net.URL; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** Utilities for register resource. */ +public class ResourceUtils { + + /** Review Comment: After read the discussion about the FLIP, I think you are right: we should lazily set the pipeline.jars. Because not all resources are used during the execution, especially in the interactive mode. So I think it's fine to not moidfy the pipeline options when init the session context. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ########## @@ -69,43 +70,67 @@ public URLClassLoader getUserClassLoader() { return userClassLoader; } - public void registerResource(ResourceUri resourceUri) throws IOException { - // check whether the resource has been registered - if (resourceInfos.containsKey(resourceUri)) { - LOG.info( - "Resource [{}] has been registered, overwriting of registered resource is not supported " - + "in the current version, skipping.", - resourceUri.getUri()); - return; - } - - // here can check whether the resource path is valid - Path path = new Path(resourceUri.getUri()); - // check resource firstly - checkResource(path); - - URL localUrl; - // check resource scheme - String scheme = StringUtils.lowerCase(path.toUri().getScheme()); - // download resource to local path firstly if in remote - if (scheme != null && !"file".equals(scheme)) { - localUrl = downloadResource(path); - } else { - localUrl = getURLFromPath(path); + public void registerResource(List<ResourceUri> resourceUris) throws IOException { Review Comment: add ``` new File(localResourceDir.toString()).deleteOnExit(); ``` in the constructor. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java: ########## @@ -649,9 +649,15 @@ private void validateAndPrepareFunction(CatalogFunction function) } // Skip validation if it's not a UserDefinedFunction. } else if (function.getFunctionLanguage() == FunctionLanguage.JAVA) { + // If the resource of UDF used is not empty, register it to classloader before + // validate. + registerFunctionResource(name, function.getFunctionResources()); Review Comment: It's better we can do the protection more strictly. In the future, we relax the limitation, the developer also is able to realize that we still needs to do something to finish the feature. ########## flink-table/flink-sql-client/src/main/java/org/apache/flink/util/ClientMutableURLClassLoader.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.configuration.Configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class loader extands {@link MutableURLClassLoader}, upon the {@code addURL} method, it also + * exposes a {@code removeURL} method which used to remove unnecessary jar from current classloader + * path. This class loader wraps a {@link MutableURLClassLoader} and an old classloader list, the + * class load is delegated to the inner {@link MutableURLClassLoader}. + * + * <p>This is only used to SqlClient for supporting {@code REMOVE JAR} clause currently. When remove + * a jar, get the registered jar url list from current {@link MutableURLClassLoader} firstly, then + * create a new instance of {@link MutableURLClassLoader} which urls doesn't include the removed + * jar, and the currentClassLoader point to new instance object, the old object is added to list to + * be closed when close {@link ClientMutableURLClassLoader}. + * + * <p>Note: This classloader is not guaranteed to actually remove class or resource, any classes or + * resources in the removed jar that are already loaded, are still accessible. + */ +public class ClientMutableURLClassLoader extends MutableURLClassLoader { Review Comment: add `@Experimental` ########## flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java: ########## @@ -31,10 +35,38 @@ public abstract class MutableURLClassLoader extends URLClassLoader { ClassLoader.registerAsParallelCapable(); } + /** + * Creates a new instance of MutableURLClassLoader subclass for the specified URLs, parent class + * loader and configuration. + */ + public static MutableURLClassLoader newInstance( + final URL[] urls, final ClassLoader parent, final Configuration configuration) { + final String[] alwaysParentFirstLoaderPatterns = + CoreOptions.getParentFirstLoaderPatterns(configuration); + final String classLoaderResolveOrder = + configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER); + final FlinkUserCodeClassLoaders.ResolveOrder resolveOrder = + FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder); + final boolean checkClassloaderLeak = + configuration.getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER); + return FlinkUserCodeClassLoaders.create( + resolveOrder, + urls, + parent, + alwaysParentFirstLoaderPatterns, + NOOP_EXCEPTION_HANDLER, + checkClassloaderLeak); + } + public MutableURLClassLoader(URL[] urls, ClassLoader parent) { super(urls, parent); } + @Override + protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + return super.loadClass(name, resolve); + } + Review Comment: Do we need this if it just uses `super` implementation? ########## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java: ########## @@ -235,69 +233,58 @@ public static SessionContext create(DefaultContext defaultContext, String sessio settings.getBuiltInDatabaseName())) .build(); - FunctionCatalog functionCatalog = - new FunctionCatalog(configuration, catalogManager, moduleManager, classLoader); - SessionState sessionState = - new SessionState(catalogManager, moduleManager, functionCatalog); + final FunctionCatalog functionCatalog = + new FunctionCatalog(configuration, resourceManager, catalogManager, moduleManager); + final SessionState sessionState = + new SessionState(catalogManager, moduleManager, resourceManager, functionCatalog); // -------------------------------------------------------------------------------------------------------------- // Init ExecutionContext // -------------------------------------------------------------------------------------------------------------- ExecutionContext executionContext = - new ExecutionContext(configuration, classLoader, sessionState); + new ExecutionContext(configuration, userClassLoader, sessionState); return new SessionContext( defaultContext, sessionId, configuration, - classLoader, + userClassLoader, sessionState, executionContext); } public void addJar(String jarPath) { - URL jarURL = getURLFromPath(jarPath, "SQL Client only supports to add local jars."); - if (dependencies.contains(jarURL)) { - return; + checkJarPath(jarPath, "SQL Client only supports to add local jars."); + try { + sessionState.resourceManager.registerResource( + Collections.singletonList(new ResourceUri(ResourceType.JAR, jarPath))); + } catch (IOException e) { + LOG.warn(String.format("Could not register the specified jar.", jarPath), e); Review Comment: String.format("Could not register the specified jar %s.", jarPath) ########## flink-table/flink-sql-client/src/main/java/org/apache/flink/util/ClientMutableURLClassLoader.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.configuration.Configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class loader extands {@link MutableURLClassLoader}, upon the {@code addURL} method, it also Review Comment: extands -> extends ########## flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java: ########## @@ -31,10 +35,38 @@ public abstract class MutableURLClassLoader extends URLClassLoader { ClassLoader.registerAsParallelCapable(); } + /** + * Creates a new instance of MutableURLClassLoader subclass for the specified URLs, parent class Review Comment: remove subclass in the java doc. ########## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java: ########## @@ -71,17 +70,14 @@ public class SessionContext { private final Configuration sessionConfiguration; private final SessionState sessionState; - // SafetyNetWrapperClassLoader doesn't override the getURL therefore we need to maintain the - // dependencies by ourselves. - private Set<URL> dependencies; - private URLClassLoader classLoader; + private ClientMutableURLClassLoader classLoader; Review Comment: use final ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/utils/ResourceUtils.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.configuration.ConfigUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.table.resource.ResourceManager; +import org.apache.flink.table.resource.ResourceType; +import org.apache.flink.table.resource.ResourceUri; +import org.apache.flink.util.MutableURLClassLoader; + +import java.io.IOException; +import java.net.URL; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** Utilities for register resource. */ +public class ResourceUtils { + + /** + * This method is used to register the jars which is configured by pipeline.jars option in + * {@link Configuration} to {@link ResourceManager}. + */ + public static void registerPipelineJars( + Configuration configuration, ResourceManager resourceManager) { + Set<String> jarsInConfig = + new HashSet<>( + ConfigUtils.decodeListFromConfig( + configuration, PipelineOptions.JARS, String::toString)); + List<ResourceUri> resourceUris = + jarsInConfig.stream() + .map(jarPath -> new ResourceUri(ResourceType.JAR, jarPath)) + .collect(Collectors.toList()); + try { + resourceManager.registerResource(resourceUris); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Failed to register pipeline.jars [%s] in configuration to ResourceManager.", + jarsInConfig), + e); + } + } + + /** The tool method to create {@link ResourceManager}. */ + public static ResourceManager createResourceManager( + URL[] urls, ClassLoader parent, Configuration configuration) { + MutableURLClassLoader mutableURLClassLoader = + MutableURLClassLoader.newInstance(urls, parent, configuration); + return new ResourceManager(configuration, mutableURLClassLoader); + } + + /** Private constructor to prevent instantiation. */ + private ResourceUtils() { + throw new RuntimeException(); Review Comment: Don't need to throw exceptions here.. ########## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java: ########## @@ -53,12 +54,14 @@ public class ExecutionContext { // Members that should be reused in the same session. private final Configuration flinkConfig; private final SessionState sessionState; - private final URLClassLoader classLoader; + private final MutableURLClassLoader classLoader; Review Comment: After introduing the MutableURLClassloader, I think we don't need `wrapClassloader` anymore. ########## flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java: ########## @@ -235,69 +233,58 @@ public static SessionContext create(DefaultContext defaultContext, String sessio settings.getBuiltInDatabaseName())) .build(); - FunctionCatalog functionCatalog = - new FunctionCatalog(configuration, catalogManager, moduleManager, classLoader); - SessionState sessionState = - new SessionState(catalogManager, moduleManager, functionCatalog); + final FunctionCatalog functionCatalog = + new FunctionCatalog(configuration, resourceManager, catalogManager, moduleManager); + final SessionState sessionState = + new SessionState(catalogManager, moduleManager, resourceManager, functionCatalog); // -------------------------------------------------------------------------------------------------------------- // Init ExecutionContext // -------------------------------------------------------------------------------------------------------------- ExecutionContext executionContext = - new ExecutionContext(configuration, classLoader, sessionState); + new ExecutionContext(configuration, userClassLoader, sessionState); return new SessionContext( defaultContext, sessionId, configuration, - classLoader, + userClassLoader, sessionState, executionContext); } public void addJar(String jarPath) { - URL jarURL = getURLFromPath(jarPath, "SQL Client only supports to add local jars."); - if (dependencies.contains(jarURL)) { - return; + checkJarPath(jarPath, "SQL Client only supports to add local jars."); + try { + sessionState.resourceManager.registerResource( + Collections.singletonList(new ResourceUri(ResourceType.JAR, jarPath))); + } catch (IOException e) { + LOG.warn(String.format("Could not register the specified jar.", jarPath), e); } - - // merge the jars in config with the jars maintained in session - Set<URL> jarsInConfig = getJarsInConfig(); - - Set<URL> newDependencies = new HashSet<>(dependencies); - newDependencies.addAll(jarsInConfig); - newDependencies.add(jarURL); - updateClassLoaderAndDependencies(newDependencies); - - // renew the execution context - executionContext = new ExecutionContext(sessionConfiguration, classLoader, sessionState); } public void removeJar(String jarPath) { - URL jarURL = getURLFromPath(jarPath, "SQL Client only supports to remove local jars."); - if (!dependencies.contains(jarURL)) { + checkJarPath(jarPath, "SQL Client only supports to remove local jars."); + // remove jar from resource manager + URL jarURL = + sessionState.resourceManager.unregisterJarResource( + new ResourceUri(ResourceType.JAR, jarPath)); + if (jarURL == null) { LOG.warn( String.format( - "Could not remove the specified jar because the jar path(%s) is not found in session classloader.", - jarPath)); + "Could not remove the specified jar because the jar path [%s] is not found in classloader.", + jarURL)); return; } - - Set<URL> newDependencies = new HashSet<>(dependencies); - // merge the jars in config with the jars maintained in session - Set<URL> jarsInConfig = getJarsInConfig(); - newDependencies.addAll(jarsInConfig); - newDependencies.remove(jarURL); - - updateClassLoaderAndDependencies(newDependencies); - - // renew the execution context - executionContext = new ExecutionContext(sessionConfiguration, classLoader, sessionState); + // remove jar from classloader + classLoader.removeURL(jarURL); } public List<String> listJars() { - return dependencies.stream().map(URL::getPath).collect(Collectors.toList()); + return sessionState.resourceManager.getResources().keySet().stream() Review Comment: resource manager also contains file, archive. In Spark, the list jar only contains added jar files. User uses list archive to show others. [1] https://spark.apache.org/docs/latest/sql-ref-syntax-aux-resource-mgmt-list-archive.html [2] https://spark.apache.org/docs/latest/sql-ref-syntax-aux-resource-mgmt-list-file.html [3] https://spark.apache.org/docs/latest/sql-ref-syntax-aux-resource-mgmt-list-jar.html ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ########## @@ -69,43 +70,67 @@ public URLClassLoader getUserClassLoader() { return userClassLoader; } - public void registerResource(ResourceUri resourceUri) throws IOException { - // check whether the resource has been registered - if (resourceInfos.containsKey(resourceUri)) { - LOG.info( - "Resource [{}] has been registered, overwriting of registered resource is not supported " - + "in the current version, skipping.", - resourceUri.getUri()); - return; - } - - // here can check whether the resource path is valid - Path path = new Path(resourceUri.getUri()); - // check resource firstly - checkResource(path); - - URL localUrl; - // check resource scheme - String scheme = StringUtils.lowerCase(path.toUri().getScheme()); - // download resource to local path firstly if in remote - if (scheme != null && !"file".equals(scheme)) { - localUrl = downloadResource(path); - } else { - localUrl = getURLFromPath(path); + public void registerResource(List<ResourceUri> resourceUris) throws IOException { + // Due to anyone of the resource in list maybe fail during register, so we should stage it + // before actual register to guarantee transaction process. If all the resource check + // successfully, register them in batch. + Map<ResourceUri, URL> stagingResourceLocalURLs = new HashMap<>(); + + for (ResourceUri resourceUri : resourceUris) { + // check whether the resource has been registered + if (resourceInfos.containsKey(resourceUri)) { + LOG.info( + "Resource [{}] has been registered, overwriting of registered resource is not supported " + + "in the current version, skipping.", + resourceUri.getUri()); + continue; + } + + // here can check whether the resource path is valid + Path path = new Path(resourceUri.getUri()); + // check resource firstly + checkResource(path); + + URL localUrl; + // check resource scheme + String scheme = StringUtils.lowerCase(path.toUri().getScheme()); + // download resource to local path firstly if in remote + if (scheme != null && !"file".equals(scheme)) { + localUrl = downloadResource(path); + } else { + localUrl = getURLFromPath(path); + } + + // check the jar resource extra + if (ResourceType.JAR.equals(resourceUri.getResourceType())) { + JarUtils.checkJarFile(localUrl); + } + + // add it to staging map + stagingResourceLocalURLs.put(resourceUri, localUrl); } - // only need add jar resource to classloader - if (ResourceType.JAR.equals(resourceUri.getResourceType())) { - // check the Jar file firstly - JarUtils.checkJarFile(localUrl); - - // add it to classloader - userClassLoader.addURL(localUrl); - LOG.info("Added jar resource [{}] to class path.", localUrl); - } + // register resource in batch + stagingResourceLocalURLs.forEach( + (resourceUri, url) -> { + // jar resource need add to classloader + if (ResourceType.JAR.equals(resourceUri.getResourceType())) { + userClassLoader.addURL(url); + LOG.info("Added jar resource [{}] to class path.", url); + } + + resourceInfos.put(resourceUri, url); + LOG.info("Register resource [{}] successfully.", resourceUri.getUri()); + }); + } - resourceInfos.put(resourceUri, localUrl); - LOG.info("Register resource [{}] successfully.", resourceUri.getUri()); + /** + * The method is only used to SqlClient for supporting remove jar syntax. SqlClient must + * guarantee also remove the jar from userClassLoader because it is {@code + * ClientMutableURLClassLoader}. + */ + public URL unregisterJarResource(ResourceUri resourceUri) { Review Comment: if it only supports to remove jar resource, it's better we modify the input parameter to string jarPath. ``` public URL unregisterJarResource(String jarPath) ``` BTW, it's not a good idea that modify the common object in order to expose a limited feature. I think we should do as the `ClientMutableClassloader`. Maybe we can also not remove the resource info from the map because the drop function also don't clean up. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ########## @@ -69,43 +70,67 @@ public URLClassLoader getUserClassLoader() { return userClassLoader; } - public void registerResource(ResourceUri resourceUri) throws IOException { - // check whether the resource has been registered - if (resourceInfos.containsKey(resourceUri)) { - LOG.info( - "Resource [{}] has been registered, overwriting of registered resource is not supported " - + "in the current version, skipping.", - resourceUri.getUri()); - return; - } - - // here can check whether the resource path is valid - Path path = new Path(resourceUri.getUri()); - // check resource firstly - checkResource(path); - - URL localUrl; - // check resource scheme - String scheme = StringUtils.lowerCase(path.toUri().getScheme()); - // download resource to local path firstly if in remote - if (scheme != null && !"file".equals(scheme)) { - localUrl = downloadResource(path); - } else { - localUrl = getURLFromPath(path); + public void registerResource(List<ResourceUri> resourceUris) throws IOException { + // Due to anyone of the resource in list maybe fail during register, so we should stage it + // before actual register to guarantee transaction process. If all the resource check + // successfully, register them in batch. + Map<ResourceUri, URL> stagingResourceLocalURLs = new HashMap<>(); + + for (ResourceUri resourceUri : resourceUris) { + // check whether the resource has been registered + if (resourceInfos.containsKey(resourceUri)) { Review Comment: It's better skip the download when the key and value exists at the same time ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ########## @@ -69,43 +70,67 @@ public URLClassLoader getUserClassLoader() { return userClassLoader; } - public void registerResource(ResourceUri resourceUri) throws IOException { - // check whether the resource has been registered - if (resourceInfos.containsKey(resourceUri)) { - LOG.info( - "Resource [{}] has been registered, overwriting of registered resource is not supported " - + "in the current version, skipping.", - resourceUri.getUri()); - return; - } - - // here can check whether the resource path is valid - Path path = new Path(resourceUri.getUri()); - // check resource firstly - checkResource(path); - - URL localUrl; - // check resource scheme - String scheme = StringUtils.lowerCase(path.toUri().getScheme()); - // download resource to local path firstly if in remote - if (scheme != null && !"file".equals(scheme)) { - localUrl = downloadResource(path); - } else { - localUrl = getURLFromPath(path); + public void registerResource(List<ResourceUri> resourceUris) throws IOException { + // Due to anyone of the resource in list maybe fail during register, so we should stage it + // before actual register to guarantee transaction process. If all the resource check + // successfully, register them in batch. Review Comment: If all the resources are avaliable, register them into the `ResourceManager`. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ########## @@ -69,43 +70,67 @@ public URLClassLoader getUserClassLoader() { return userClassLoader; } - public void registerResource(ResourceUri resourceUri) throws IOException { - // check whether the resource has been registered - if (resourceInfos.containsKey(resourceUri)) { - LOG.info( - "Resource [{}] has been registered, overwriting of registered resource is not supported " - + "in the current version, skipping.", - resourceUri.getUri()); - return; - } - - // here can check whether the resource path is valid - Path path = new Path(resourceUri.getUri()); - // check resource firstly - checkResource(path); - - URL localUrl; - // check resource scheme - String scheme = StringUtils.lowerCase(path.toUri().getScheme()); - // download resource to local path firstly if in remote - if (scheme != null && !"file".equals(scheme)) { - localUrl = downloadResource(path); - } else { - localUrl = getURLFromPath(path); + public void registerResource(List<ResourceUri> resourceUris) throws IOException { + // Due to anyone of the resource in list maybe fail during register, so we should stage it + // before actual register to guarantee transaction process. If all the resource check + // successfully, register them in batch. + Map<ResourceUri, URL> stagingResourceLocalURLs = new HashMap<>(); + + for (ResourceUri resourceUri : resourceUris) { Review Comment: It's better we first scan all the uri to deterimine whether contains illegal and then download one by one. Because the download is a heavy IO operation. The check should determines: 1. the resource is the JAR 2. the file should ends with ".jar" 3. the file exists ``` FileSystem sFS = FileSystem.getUnguardedFileSystem(sourcePath.toUri()); return sFs.exists(sourcePath); ``` ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java: ########## @@ -662,14 +668,30 @@ private FunctionDefinition getFunctionDefinition(String name, CatalogFunction fu // directly. return ((InlineCatalogFunction) function).getDefinition(); } + // If the resource of UDF used is not empty, register it to classloader before + // validate. + registerFunctionResource(name, function.getFunctionResources()); + return UserDefinedFunctionHelper.instantiateFunction( - classLoader, + resourceManager.getUserClassLoader(), // future config, name, function); } + private void registerFunctionResource(String functionName, List<ResourceUri> resourceUris) { + try { + resourceManager.registerResource(resourceUris); + } catch (IOException e) { + throw new ValidationException( Review Comment: Use TableException here. ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala: ########## @@ -72,6 +72,17 @@ class BatchTestBase extends BatchAbstractTestBase { "(?s)From line ([0-9]+)," + " column ([0-9]+) to line ([0-9]+), column ([0-9]+): (.*)") + def overrideTableEnv(classLoader: ClassLoader): Unit = { Review Comment: How about just assign the fields in the `before`. It's cleaner comparing to add a method to override the tEnv. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ########## @@ -69,43 +70,67 @@ public URLClassLoader getUserClassLoader() { return userClassLoader; } - public void registerResource(ResourceUri resourceUri) throws IOException { - // check whether the resource has been registered - if (resourceInfos.containsKey(resourceUri)) { - LOG.info( - "Resource [{}] has been registered, overwriting of registered resource is not supported " - + "in the current version, skipping.", - resourceUri.getUri()); - return; - } - - // here can check whether the resource path is valid - Path path = new Path(resourceUri.getUri()); - // check resource firstly - checkResource(path); - - URL localUrl; - // check resource scheme - String scheme = StringUtils.lowerCase(path.toUri().getScheme()); - // download resource to local path firstly if in remote - if (scheme != null && !"file".equals(scheme)) { - localUrl = downloadResource(path); - } else { - localUrl = getURLFromPath(path); + public void registerResource(List<ResourceUri> resourceUris) throws IOException { + // Due to anyone of the resource in list maybe fail during register, so we should stage it + // before actual register to guarantee transaction process. If all the resource check + // successfully, register them in batch. + Map<ResourceUri, URL> stagingResourceLocalURLs = new HashMap<>(); + + for (ResourceUri resourceUri : resourceUris) { + // check whether the resource has been registered + if (resourceInfos.containsKey(resourceUri)) { + LOG.info( + "Resource [{}] has been registered, overwriting of registered resource is not supported " + + "in the current version, skipping.", + resourceUri.getUri()); + continue; + } + + // here can check whether the resource path is valid + Path path = new Path(resourceUri.getUri()); + // check resource firstly + checkResource(path); + + URL localUrl; + // check resource scheme + String scheme = StringUtils.lowerCase(path.toUri().getScheme()); + // download resource to local path firstly if in remote + if (scheme != null && !"file".equals(scheme)) { + localUrl = downloadResource(path); + } else { + localUrl = getURLFromPath(path); + } + + // check the jar resource extra + if (ResourceType.JAR.equals(resourceUri.getResourceType())) { + JarUtils.checkJarFile(localUrl); + } + + // add it to staging map + stagingResourceLocalURLs.put(resourceUri, localUrl); } - // only need add jar resource to classloader - if (ResourceType.JAR.equals(resourceUri.getResourceType())) { - // check the Jar file firstly - JarUtils.checkJarFile(localUrl); - - // add it to classloader - userClassLoader.addURL(localUrl); - LOG.info("Added jar resource [{}] to class path.", localUrl); - } + // register resource in batch + stagingResourceLocalURLs.forEach( + (resourceUri, url) -> { + // jar resource need add to classloader + if (ResourceType.JAR.equals(resourceUri.getResourceType())) { + userClassLoader.addURL(url); + LOG.info("Added jar resource [{}] to class path.", url); + } Review Comment: I am a little confused about this. I think all resources should be added into the URLClassloader. If we don't add into the classloader, how can Flink find the resources? ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java: ########## @@ -69,43 +70,67 @@ public URLClassLoader getUserClassLoader() { return userClassLoader; } - public void registerResource(ResourceUri resourceUri) throws IOException { - // check whether the resource has been registered - if (resourceInfos.containsKey(resourceUri)) { - LOG.info( - "Resource [{}] has been registered, overwriting of registered resource is not supported " - + "in the current version, skipping.", - resourceUri.getUri()); - return; - } - - // here can check whether the resource path is valid - Path path = new Path(resourceUri.getUri()); - // check resource firstly - checkResource(path); - - URL localUrl; - // check resource scheme - String scheme = StringUtils.lowerCase(path.toUri().getScheme()); - // download resource to local path firstly if in remote - if (scheme != null && !"file".equals(scheme)) { - localUrl = downloadResource(path); - } else { - localUrl = getURLFromPath(path); + public void registerResource(List<ResourceUri> resourceUris) throws IOException { + // Due to anyone of the resource in list maybe fail during register, so we should stage it + // before actual register to guarantee transaction process. If all the resource check + // successfully, register them in batch. + Map<ResourceUri, URL> stagingResourceLocalURLs = new HashMap<>(); + + for (ResourceUri resourceUri : resourceUris) { + // check whether the resource has been registered + if (resourceInfos.containsKey(resourceUri)) { + LOG.info( + "Resource [{}] has been registered, overwriting of registered resource is not supported " + + "in the current version, skipping.", + resourceUri.getUri()); + continue; + } + + // here can check whether the resource path is valid + Path path = new Path(resourceUri.getUri()); + // check resource firstly + checkResource(path); + + URL localUrl; + // check resource scheme + String scheme = StringUtils.lowerCase(path.toUri().getScheme()); + // download resource to local path firstly if in remote + if (scheme != null && !"file".equals(scheme)) { + localUrl = downloadResource(path); + } else { + localUrl = getURLFromPath(path); + } + + // check the jar resource extra + if (ResourceType.JAR.equals(resourceUri.getResourceType())) { + JarUtils.checkJarFile(localUrl); + } + + // add it to staging map + stagingResourceLocalURLs.put(resourceUri, localUrl); } - // only need add jar resource to classloader - if (ResourceType.JAR.equals(resourceUri.getResourceType())) { - // check the Jar file firstly - JarUtils.checkJarFile(localUrl); - - // add it to classloader - userClassLoader.addURL(localUrl); - LOG.info("Added jar resource [{}] to class path.", localUrl); - } + // register resource in batch + stagingResourceLocalURLs.forEach( + (resourceUri, url) -> { + // jar resource need add to classloader + if (ResourceType.JAR.equals(resourceUri.getResourceType())) { + userClassLoader.addURL(url); + LOG.info("Added jar resource [{}] to class path.", url); + } + + resourceInfos.put(resourceUri, url); Review Comment: It's not straightfowrad that in the local file system, we can not unregister the jar with the relative path if the jar is registered in the absolute path. But in actual they are the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
