This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1aa7d16c1188dd70dd7cbf1a7e4acf0e0db8cea1 Author: Chesnay Schepler <[email protected]> AuthorDate: Tue Jul 6 16:59:30 2021 +0200 [FLINK-18783] Add ComponentClassLoader The new ComponentClassLoader is intended as a CL that can eventually support all of our class-loading needs, including plugins, rpc systems and user-code. --- .../core/classloading/ComponentClassLoader.java | 266 ++++++++++++++++++++ .../org/apache/flink/core/plugin/PluginLoader.java | 96 +------- .../classloading/ComponentClassLoaderTest.java | 268 +++++++++++++++++++++ 3 files changed, 537 insertions(+), 93 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java b/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java new file mode 100644 index 0000000..39636a6 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.classloading; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.function.FunctionWithException; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.Iterator; + +/** + * A {@link URLClassLoader} that restricts which classes can be loaded to those contained within the + * given classpath, except classes from a given set of packages that are either loaded owner or + * component-first. + * + * <p>Depiction of the class loader hierarchy: + * + * <pre> + * Owner Bootstrap + * ^ ^ + * |---------| + * | + * Component + * </pre> + * + * <p>For loading classes/resources, class loaders are accessed in one of the following orders: + * + * <ul> + * <li>component-only: component -> bootstrap; default. + * <li>component-first: component -> bootstrap -> owner; opt-in. + * <li>owner-first: owner -> component -> bootstrap; opt-in. + * </ul> + */ +public class ComponentClassLoader extends URLClassLoader { + private static final ClassLoader PLATFORM_OR_BOOTSTRAP_LOADER; + + private final ClassLoader ownerClassLoader; + + private final String[] ownerFirstPackages; + private final String[] componentFirstPackages; + private final String[] ownerFirstResourcePrefixes; + private final String[] componentFirstResourcePrefixes; + + public ComponentClassLoader( + URL[] classpath, + ClassLoader ownerClassLoader, + String[] ownerFirstPackages, + String[] componentFirstPackages) { + super(classpath, PLATFORM_OR_BOOTSTRAP_LOADER); + this.ownerClassLoader = ownerClassLoader; + + this.ownerFirstPackages = ownerFirstPackages; + this.componentFirstPackages = componentFirstPackages; + + ownerFirstResourcePrefixes = convertPackagePrefixesToPathPrefixes(ownerFirstPackages); + componentFirstResourcePrefixes = + convertPackagePrefixesToPathPrefixes(componentFirstPackages); + } + + // ---------------------------------------------------------------------------------------------- + // Class loading + // ---------------------------------------------------------------------------------------------- + + @Override + protected Class<?> loadClass(final String name, final boolean resolve) + throws ClassNotFoundException { + synchronized (getClassLoadingLock(name)) { + final Class<?> loadedClass = findLoadedClass(name); + if (loadedClass != null) { + return resolveIfNeeded(resolve, loadedClass); + } + + if (isComponentFirstClass(name)) { + return loadClassFromComponentFirst(name, resolve); + } + if (isOwnerFirstClass(name)) { + return loadClassFromOwnerFirst(name, resolve); + } + + // making this behavior configurable (component-only/component-first/owner-first) would + // allow this class to subsume the FlinkUserCodeClassLoader (with an added exception + // handler) + return loadClassFromComponentOnly(name, resolve); + } + } + + private Class<?> resolveIfNeeded(final boolean resolve, final Class<?> loadedClass) { + if (resolve) { + resolveClass(loadedClass); + } + return loadedClass; + } + + private boolean isOwnerFirstClass(final String name) { + return Arrays.stream(ownerFirstPackages).anyMatch(name::startsWith); + } + + private boolean isComponentFirstClass(final String name) { + return Arrays.stream(componentFirstPackages).anyMatch(name::startsWith); + } + + private Class<?> loadClassFromComponentOnly(final String name, final boolean resolve) + throws ClassNotFoundException { + return super.loadClass(name, resolve); + } + + private Class<?> loadClassFromComponentFirst(final String name, final boolean resolve) + throws ClassNotFoundException { + try { + return loadClassFromComponentOnly(name, resolve); + } catch (ClassNotFoundException | NoClassDefFoundError e) { + return loadClassFromOwnerOnly(name, resolve); + } + } + + private Class<?> loadClassFromOwnerOnly(final String name, final boolean resolve) + throws ClassNotFoundException { + return resolveIfNeeded(resolve, ownerClassLoader.loadClass(name)); + } + + private Class<?> loadClassFromOwnerFirst(final String name, final boolean resolve) + throws ClassNotFoundException { + try { + return loadClassFromOwnerOnly(name, resolve); + } catch (ClassNotFoundException | NoClassDefFoundError e) { + return loadClassFromComponentOnly(name, resolve); + } + } + + // ---------------------------------------------------------------------------------------------- + // Resource loading + // ---------------------------------------------------------------------------------------------- + + @Override + public URL getResource(final String name) { + try { + final Enumeration<URL> resources = getResources(name); + if (resources.hasMoreElements()) { + return resources.nextElement(); + } + } catch (IOException ignored) { + // mimics the behavior of the JDK + } + return null; + } + + @Override + public Enumeration<URL> getResources(final String name) throws IOException { + if (isComponentFirstResource(name)) { + return loadResourceFromComponentFirst(name); + } + if (isOwnerFirstResource(name)) { + return loadResourceFromOwnerFirst(name); + } + + return loadResourceFromComponentOnly(name); + } + + private boolean isOwnerFirstResource(final String name) { + return Arrays.stream(ownerFirstResourcePrefixes).anyMatch(name::startsWith); + } + + private boolean isComponentFirstResource(final String name) { + return Arrays.stream(componentFirstResourcePrefixes).anyMatch(name::startsWith); + } + + private Enumeration<URL> loadResourceFromComponentOnly(final String name) throws IOException { + return super.getResources(name); + } + + private Enumeration<URL> loadResourceFromComponentFirst(final String name) throws IOException { + return loadResourcesInOrder( + name, this::loadResourceFromComponentOnly, this::loadResourceFromOwnerOnly); + } + + private Enumeration<URL> loadResourceFromOwnerOnly(final String name) throws IOException { + return ownerClassLoader.getResources(name); + } + + private Enumeration<URL> loadResourceFromOwnerFirst(final String name) throws IOException { + return loadResourcesInOrder( + name, this::loadResourceFromOwnerOnly, this::loadResourceFromComponentOnly); + } + + private interface ResourceLoadingFunction + extends FunctionWithException<String, Enumeration<URL>, IOException> {} + + private Enumeration<URL> loadResourcesInOrder( + String name, + ResourceLoadingFunction firstClassLoader, + ResourceLoadingFunction secondClassLoader) + throws IOException { + final Iterator<URL> iterator = + Iterators.concat( + Iterators.forEnumeration(firstClassLoader.apply(name)), + Iterators.forEnumeration(secondClassLoader.apply(name))); + + return new IteratorBackedEnumeration<>(iterator); + } + + @VisibleForTesting + static class IteratorBackedEnumeration<T> implements Enumeration<T> { + private final Iterator<T> backingIterator; + + public IteratorBackedEnumeration(Iterator<T> backingIterator) { + this.backingIterator = backingIterator; + } + + @Override + public boolean hasMoreElements() { + return backingIterator.hasNext(); + } + + @Override + public T nextElement() { + return backingIterator.next(); + } + } + + // ---------------------------------------------------------------------------------------------- + // Utils + // ---------------------------------------------------------------------------------------------- + + private static String[] convertPackagePrefixesToPathPrefixes(String[] packagePrefixes) { + return Arrays.stream(packagePrefixes) + .map(packageName -> packageName.replace('.', '/')) + .toArray(String[]::new); + } + + static { + ClassLoader platformLoader = null; + try { + platformLoader = + (ClassLoader) + ClassLoader.class.getMethod("getPlatformClassLoader").invoke(null); + } catch (NoSuchMethodException e) { + // on Java 8 this method does not exist, but using null indicates the bootstrap + // loader that we want to have + } catch (Exception e) { + throw new IllegalStateException("Cannot retrieve platform classloader on Java 9+", e); + } + PLATFORM_OR_BOOTSTRAP_LOADER = platformLoader; + ClassLoader.registerAsParallelCapable(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java index 690a2a2..a81877d 100644 --- a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java @@ -19,6 +19,7 @@ package org.apache.flink.core.plugin; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.classloading.ComponentClassLoader; import org.apache.flink.util.ArrayUtils; import org.apache.flink.util.TemporaryClassLoaderContext; @@ -30,8 +31,6 @@ import javax.annotation.concurrent.ThreadSafe; import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; -import java.util.Arrays; -import java.util.Enumeration; import java.util.Iterator; import java.util.ServiceLoader; @@ -146,102 +145,13 @@ public class PluginLoader implements AutoCloseable { * <p>No class/resource in the system class loader (everything in lib/) can be seen in the * plugin except those starting with a whitelist prefix. */ - private static final class PluginClassLoader extends URLClassLoader { - private static final ClassLoader PLATFORM_OR_BOOTSTRAP_LOADER; - - private final ClassLoader flinkClassLoader; - - private final String[] allowedFlinkPackages; - - private final String[] allowedResourcePrefixes; + private static final class PluginClassLoader extends ComponentClassLoader { PluginClassLoader( URL[] pluginResourceURLs, ClassLoader flinkClassLoader, String[] allowedFlinkPackages) { - super(pluginResourceURLs, PLATFORM_OR_BOOTSTRAP_LOADER); - this.flinkClassLoader = flinkClassLoader; - this.allowedFlinkPackages = allowedFlinkPackages; - allowedResourcePrefixes = - Arrays.stream(allowedFlinkPackages) - .map(packageName -> packageName.replace('.', '/')) - .toArray(String[]::new); - } - - @Override - protected Class<?> loadClass(final String name, final boolean resolve) - throws ClassNotFoundException { - synchronized (getClassLoadingLock(name)) { - final Class<?> loadedClass = findLoadedClass(name); - if (loadedClass != null) { - return resolveIfNeeded(resolve, loadedClass); - } - - if (isAllowedFlinkClass(name)) { - try { - return resolveIfNeeded(resolve, flinkClassLoader.loadClass(name)); - } catch (ClassNotFoundException e) { - // fallback to resolving it in this classloader - // for cases where the plugin uses org.apache.flink namespace - } - } - - return super.loadClass(name, resolve); - } - } - - private Class<?> resolveIfNeeded(final boolean resolve, final Class<?> loadedClass) { - if (resolve) { - resolveClass(loadedClass); - } - - return loadedClass; - } - - @Override - public URL getResource(final String name) { - if (isAllowedFlinkResource(name)) { - return flinkClassLoader.getResource(name); - } - - return super.getResource(name); - } - - @Override - public Enumeration<URL> getResources(final String name) throws IOException { - // ChildFirstClassLoader merges child and parent resources - if (isAllowedFlinkResource(name)) { - return flinkClassLoader.getResources(name); - } - - return super.getResources(name); - } - - private boolean isAllowedFlinkClass(final String name) { - return Arrays.stream(allowedFlinkPackages).anyMatch(name::startsWith); - } - - private boolean isAllowedFlinkResource(final String name) { - return Arrays.stream(allowedResourcePrefixes).anyMatch(name::startsWith); - } - - static { - ClassLoader platformLoader = null; - try { - platformLoader = - (ClassLoader) - ClassLoader.class.getMethod("getPlatformClassLoader").invoke(null); - } catch (NoSuchMethodException e) { - // on Java 8 this method does not exist, but using null indicates the bootstrap - // loader that we want - // to have - } catch (Exception e) { - throw new IllegalStateException( - "Cannot retrieve platform classloader on Java 9+", e); - } - PLATFORM_OR_BOOTSTRAP_LOADER = platformLoader; - - ClassLoader.registerAsParallelCapable(); + super(pluginResourceURLs, flinkClassLoader, allowedFlinkPackages, new String[0]); } } } diff --git a/flink-core/src/test/java/org/apache/flink/core/classloading/ComponentClassLoaderTest.java b/flink-core/src/test/java/org/apache/flink/core/classloading/ComponentClassLoaderTest.java new file mode 100644 index 0000000..1d795a1 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/classloading/ComponentClassLoaderTest.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.classloading; + +import org.apache.flink.util.TestLogger; + +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Enumeration; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +/** Tests for the {@link ComponentClassLoader}. */ +public class ComponentClassLoaderTest extends TestLogger { + + private static final String NON_EXISTENT_CLASS_NAME = "foo.Bar"; + private static final Class<?> CLASS_TO_LOAD = Class.class; + private static final Class<?> CLASS_RETURNED_BY_OWNER = ComponentClassLoaderTest.class; + + private static final String NON_EXISTENT_RESOURCE_NAME = "foo/Bar"; + private static String resourceToLoad; + private static final URL RESOURCE_RETURNED_BY_OWNER = createURL(); + + @ClassRule public static final TemporaryFolder TMP = new TemporaryFolder(); + + @BeforeClass + public static void setup() throws IOException { + resourceToLoad = TMP.newFile("tmpfile").getName(); + } + + // ---------------------------------------------------------------------------------------------- + // Class loading + // ---------------------------------------------------------------------------------------------- + + @Test(expected = ClassNotFoundException.class) + public void testComponentOnlyIsDefaultForClasses() throws Exception { + TestUrlClassLoader owner = + new TestUrlClassLoader(NON_EXISTENT_CLASS_NAME, CLASS_RETURNED_BY_OWNER); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader(new URL[0], owner, new String[0], new String[0]); + + componentClassLoader.loadClass(NON_EXISTENT_CLASS_NAME); + } + + @Test + public void testOwnerFirstClassFoundIgnoresComponent() throws Exception { + TestUrlClassLoader owner = + new TestUrlClassLoader(CLASS_TO_LOAD.getName(), CLASS_RETURNED_BY_OWNER); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[0], owner, new String[] {CLASS_TO_LOAD.getName()}, new String[0]); + + final Class<?> loadedClass = componentClassLoader.loadClass(CLASS_TO_LOAD.getName()); + assertThat(loadedClass, sameInstance(CLASS_RETURNED_BY_OWNER)); + } + + @Test + public void testOwnerFirstClassNotFoundFallsBackToComponent() throws Exception { + TestUrlClassLoader owner = new TestUrlClassLoader(); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[0], owner, new String[] {CLASS_TO_LOAD.getName()}, new String[0]); + + final Class<?> loadedClass = componentClassLoader.loadClass(CLASS_TO_LOAD.getName()); + assertThat(loadedClass, sameInstance(CLASS_TO_LOAD)); + } + + @Test + public void testComponentFirstClassFoundIgnoresOwner() throws Exception { + TestUrlClassLoader owner = + new TestUrlClassLoader(CLASS_TO_LOAD.getName(), CLASS_RETURNED_BY_OWNER); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[0], owner, new String[0], new String[] {CLASS_TO_LOAD.getName()}); + + final Class<?> loadedClass = componentClassLoader.loadClass(CLASS_TO_LOAD.getName()); + assertThat(loadedClass, sameInstance(CLASS_TO_LOAD)); + } + + @Test + public void testComponentFirstClassNotFoundFallsBackToOwner() throws Exception { + TestUrlClassLoader owner = + new TestUrlClassLoader(NON_EXISTENT_CLASS_NAME, CLASS_RETURNED_BY_OWNER); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[0], owner, new String[0], new String[] {NON_EXISTENT_CLASS_NAME}); + + final Class<?> loadedClass = componentClassLoader.loadClass(NON_EXISTENT_CLASS_NAME); + assertThat(loadedClass, sameInstance(CLASS_RETURNED_BY_OWNER)); + } + + // ---------------------------------------------------------------------------------------------- + // Resource loading + // ---------------------------------------------------------------------------------------------- + + @Test + public void testComponentOnlyIsDefaultForResources() throws IOException { + TestUrlClassLoader owner = new TestUrlClassLoader(); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader(new URL[0], owner, new String[0], new String[0]); + + assertThat(componentClassLoader.getResource(NON_EXISTENT_RESOURCE_NAME), nullValue()); + assertThat( + componentClassLoader.getResources(NON_EXISTENT_RESOURCE_NAME).hasMoreElements(), + is(false)); + } + + @Test + public void testOwnerFirstResourceFoundIgnoresComponent() { + TestUrlClassLoader owner = + new TestUrlClassLoader(resourceToLoad, RESOURCE_RETURNED_BY_OWNER); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[] {}, owner, new String[] {resourceToLoad}, new String[0]); + + final URL loadedResource = componentClassLoader.getResource(resourceToLoad); + assertThat(loadedResource, sameInstance(RESOURCE_RETURNED_BY_OWNER)); + } + + @Test + public void testOwnerFirstResourceNotFoundFallsBackToComponent() throws Exception { + TestUrlClassLoader owner = new TestUrlClassLoader(); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[] {TMP.getRoot().toURI().toURL()}, + owner, + new String[] {resourceToLoad}, + new String[0]); + + final URL loadedResource = componentClassLoader.getResource(resourceToLoad); + assertThat(loadedResource.toString(), containsString(resourceToLoad)); + } + + @Test + public void testComponentFirstResourceFoundIgnoresOwner() throws Exception { + TestUrlClassLoader owner = + new TestUrlClassLoader(resourceToLoad, RESOURCE_RETURNED_BY_OWNER); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[] {TMP.getRoot().toURI().toURL()}, + owner, + new String[0], + new String[] {resourceToLoad}); + + final URL loadedResource = componentClassLoader.getResource(resourceToLoad); + assertThat(loadedResource.toString(), containsString(resourceToLoad)); + } + + @Test + public void testComponentFirstResourceNotFoundFallsBackToOwner() { + TestUrlClassLoader owner = + new TestUrlClassLoader(NON_EXISTENT_RESOURCE_NAME, RESOURCE_RETURNED_BY_OWNER); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[0], + owner, + new String[0], + new String[] {NON_EXISTENT_RESOURCE_NAME}); + + final URL loadedResource = componentClassLoader.getResource(NON_EXISTENT_RESOURCE_NAME); + assertThat(loadedResource, sameInstance(RESOURCE_RETURNED_BY_OWNER)); + } + + private static class TestUrlClassLoader extends URLClassLoader { + + private final String nameToCheck; + private final Class<?> classToReturn; + private final URL resourceToReturn; + + public TestUrlClassLoader() { + this(null, null, null); + } + + public TestUrlClassLoader(String resourceNameToCheck, URL resourceToReturn) { + this(checkNotNull(resourceNameToCheck), null, checkNotNull(resourceToReturn)); + } + + public TestUrlClassLoader(String classNameToCheck, Class<?> classToReturn) { + this(checkNotNull(classNameToCheck), checkNotNull(classToReturn), null); + } + + public TestUrlClassLoader( + String classNameToCheck, Class<?> classToReturn, URL resourceToReturn) { + super(new URL[0], null); + this.nameToCheck = classNameToCheck; + this.classToReturn = classToReturn; + this.resourceToReturn = resourceToReturn; + } + + @Override + public Class<?> loadClass(String name) throws ClassNotFoundException { + if (nameToCheck == null) { + throw new ClassNotFoundException(); + } + if (nameToCheck.equals(name)) { + return classToReturn; + } + return super.loadClass(name); + } + + @Override + public URL getResource(String name) { + if (nameToCheck == null) { + return null; + } + if (nameToCheck.equals(name)) { + return resourceToReturn; + } + return super.getResource(name); + } + + @Override + public Enumeration<URL> getResources(String name) throws IOException { + if (nameToCheck != null && nameToCheck.equals(name)) { + return new ComponentClassLoader.IteratorBackedEnumeration<>( + Collections.singleton(resourceToReturn).iterator()); + } + return super.getResources(name); + } + } + + private static URL createURL() { + try { + return Paths.get("").toUri().toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } +}
