XComp commented on a change in pull request #17916: URL: https://github.com/apache/flink/pull/17916#discussion_r788559877
########## File path: flink-core/src/test/java/org/apache/flink/core/fs/SafeFileSystemFactoryTest.java ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.util.ChildFirstClassLoader; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URI; +import java.net.URL; +import java.security.AccessControlContext; +import java.security.AccessController; +import java.security.ProtectionDomain; +import java.util.List; +import java.util.Vector; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatIOException; + +/** Tests for the {@link SafeFileSystemFactory}. */ +public class SafeFileSystemFactoryTest { + + /** + * We need to load this class in separate classloader to leak the classloader through the {@link + * ProtectionDomain}. + */ + private static class ProtectionDomainLeakTest { + + ProtectionDomainLeakTest(FileSystemFactory factory) throws Exception { + factory.create(URI.create("test://filesystem")); + } + } + + private abstract static class TestFileSystemFactory implements FileSystemFactory { + + @Override + public String getScheme() { + throw new UnsupportedOperationException("Test."); + } + } + + private static Stream<Class<?>> getLoadedClasses(ClassLoader classLoader) throws Exception { + final Field field = ClassLoader.class.getDeclaredField("classes"); + field.setAccessible(true); + @SuppressWarnings("unchecked") + final Vector<Class<?>> classes = (Vector<Class<?>>) field.get(classLoader); + return classes.stream(); + } + + private static List<Class<?>> getLoadedFlinkClasses(ClassLoader classLoader) throws Exception { + return getLoadedClasses(classLoader) + .filter(c -> c.getName().startsWith("org.apache.flink")) + .collect(Collectors.toList()); + } + + private static ProtectionDomain[] getProtectionDomains() { + try { + final AccessControlContext context = AccessController.getContext(); + final Method domainResolver = context.getClass().getDeclaredMethod("getContext"); + domainResolver.setAccessible(true); + return (ProtectionDomain[]) domainResolver.invoke(context); + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Test + void testProtectionDomainLeaksWithRegularFileSystemFactory() throws Exception { + testProtectionDomainLeak( + new TestFileSystemFactory() { + + @Override + public FileSystem create(URI fsUri) { + boolean leak = false; + for (ProtectionDomain protectionDomain : getProtectionDomains()) { + if (protectionDomain.getClassLoader() + instanceof ChildFirstClassLoader) { + leak = true; + break; + } + } + assertThat(leak).isTrue(); + return null; + } + }); + } + + @Test + void testProtectionDomainDoesNotLeakWithWrappedFileSystemFactory() throws Exception { + testProtectionDomainLeak( + SafeFileSystemFactory.of( + new TestFileSystemFactory() { + + @Override + public FileSystem create(URI fsUri) { + for (ProtectionDomain protectionDomain : getProtectionDomains()) { + assertThat(protectionDomain.getCodeSource()) + .isNotInstanceOf(ChildFirstClassLoader.class); + } + return null; + } + })); + } + + private void testProtectionDomainLeak(FileSystemFactory factory) throws Exception { + final URL testClassLocation = + getClass().getProtectionDomain().getCodeSource().getLocation(); + final ChildFirstClassLoader leakyClassLoader = + new ChildFirstClassLoader( + new URL[] {testClassLocation}, + Thread.currentThread().getContextClassLoader(), + new String[] {"java."}, + error -> { + // No-op. + }); + final Class<?> clazz = + Class.forName(ProtectionDomainLeakTest.class.getName(), true, leakyClassLoader); + final Constructor<?> declaredConstructor = + clazz.getDeclaredConstructor(FileSystemFactory.class); + declaredConstructor.setAccessible(true); + declaredConstructor.newInstance(factory); + // We have to assert class name here, because the class we want to assert has been loaded in + // a different classloader. Review comment: Initially, I didn't understand this comment here. Not sure whether it would help others as well mentioning the implicit fact that the `Class` object attached to an instance of the class is actually a new `Class` instance when being loaded from a different classloader ########## File path: flink-core/src/main/java/org/apache/flink/core/fs/SafeFileSystemFactory.java ########## @@ -18,29 +18,85 @@ package org.apache.flink.core.fs; import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FatalExitExceptionHandler; import org.apache.flink.util.TemporaryClassLoaderContext; import org.apache.flink.util.WrappingProxy; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import java.io.IOException; import java.net.URI; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** - * A wrapper around {@link FileSystemFactory} that ensures the plugin classloader is used for all - * {@link FileSystem} operations. + * A wrapper around {@link FileSystemFactory} that ensures the same classloader is used for all + * {@link FileSystem} operations and that we don't leak the user classloader via inherited {@link + * java.security.ProtectionDomain protection domains}. + * + * <h1>Avoiding protection domains leaks + * + * <p>To understand the protection domain leaks, let's revisit how the classloading works. + * + * <p>Every class that is loaded is assigned with a {@link java.security.ProtectionDomain protection + * domain}. The {@code protection domain} contains reference to a {@link java.security.CodeSource} + * (location of the class file + signing certificates), reference to a {@link ClassLoader} that has + * loaded it, {@link java.security.Principal list of principals} and {@link java.security.Permission + * permissions}. + * + * <p>The problematic part is the reference to the {@link ClassLoader} that has loaded our class. + * The only way to get rid of the reference, would involve re-implementing a good part of the low + * level ClassLoaders, because the important parts are intentionally marked as final. + * + * <p>Now to the actual problem. There is a thing called {@link java.security.AccessControlContext}, + * which can be obtained by calling {@link java.security.AccessController#getContext()}. This + * context contains list of the current protection domains. + * + * <p>In pseudo-code, the list of protection domain is obtained as follows: + * + * <pre> + * {@code getCurrentStackTrace() | getStackTraceElementClass | Class#getProtectionDomain | uniq (as in bash)} + * </pre> + * + * <p>When a new thread is spawned it inherits the {@link java.security.AccessControlContext} of the + * caller, which means that if we spawn any thread that outlives a job from a place that has been + * called by user code (there is anything loaded by user classloader on stack), we'll leak the + * reference to user classloader to this newly created thread. This happens especially with hadoop + * filesystem, where the initialization of the filesystem spawn "cleaner threads" that are attached + * to the JVM lifecycle. + * + * <p>The way to work around this, is to spawn initialize filesystems in a "safe" thread, that has + * no potentially leaky protection domains, that the new thread can inherit. Review comment: Could we add a reference to the ticket here and add the stacktrace shown which you linked through one of the other comments (https://gist.github.com/dmvk/a77207634cde8094baf456430d1bd92d) in the FLINK ticket. That should be sufficient enough and doesn't make the JavaDoc even more verbose ########## File path: flink-core/src/main/java/org/apache/flink/core/fs/SafeFileSystemFactory.java ########## @@ -18,29 +18,85 @@ package org.apache.flink.core.fs; import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FatalExitExceptionHandler; import org.apache.flink.util.TemporaryClassLoaderContext; import org.apache.flink.util.WrappingProxy; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import java.io.IOException; import java.net.URI; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** - * A wrapper around {@link FileSystemFactory} that ensures the plugin classloader is used for all - * {@link FileSystem} operations. + * A wrapper around {@link FileSystemFactory} that ensures the same classloader is used for all Review comment: There seems to be some checkstyle issue with this JavaDoc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
