XComp commented on a change in pull request #17916:
URL: https://github.com/apache/flink/pull/17916#discussion_r788559877



##########
File path: 
flink-core/src/test/java/org/apache/flink/core/fs/SafeFileSystemFactoryTest.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.ChildFirstClassLoader;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URL;
+import java.security.AccessControlContext;
+import java.security.AccessController;
+import java.security.ProtectionDomain;
+import java.util.List;
+import java.util.Vector;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatIOException;
+
+/** Tests for the {@link SafeFileSystemFactory}. */
+public class SafeFileSystemFactoryTest {
+
+    /**
+     * We need to load this class in separate classloader to leak the 
classloader through the {@link
+     * ProtectionDomain}.
+     */
+    private static class ProtectionDomainLeakTest {
+
+        ProtectionDomainLeakTest(FileSystemFactory factory) throws Exception {
+            factory.create(URI.create("test://filesystem"));
+        }
+    }
+
+    private abstract static class TestFileSystemFactory implements 
FileSystemFactory {
+
+        @Override
+        public String getScheme() {
+            throw new UnsupportedOperationException("Test.");
+        }
+    }
+
+    private static Stream<Class<?>> getLoadedClasses(ClassLoader classLoader) 
throws Exception {
+        final Field field = ClassLoader.class.getDeclaredField("classes");
+        field.setAccessible(true);
+        @SuppressWarnings("unchecked")
+        final Vector<Class<?>> classes = (Vector<Class<?>>) 
field.get(classLoader);
+        return classes.stream();
+    }
+
+    private static List<Class<?>> getLoadedFlinkClasses(ClassLoader 
classLoader) throws Exception {
+        return getLoadedClasses(classLoader)
+                .filter(c -> c.getName().startsWith("org.apache.flink"))
+                .collect(Collectors.toList());
+    }
+
+    private static ProtectionDomain[] getProtectionDomains() {
+        try {
+            final AccessControlContext context = AccessController.getContext();
+            final Method domainResolver = 
context.getClass().getDeclaredMethod("getContext");
+            domainResolver.setAccessible(true);
+            return (ProtectionDomain[]) domainResolver.invoke(context);
+        } catch (NoSuchMethodException | InvocationTargetException | 
IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Test
+    void testProtectionDomainLeaksWithRegularFileSystemFactory() throws 
Exception {
+        testProtectionDomainLeak(
+                new TestFileSystemFactory() {
+
+                    @Override
+                    public FileSystem create(URI fsUri) {
+                        boolean leak = false;
+                        for (ProtectionDomain protectionDomain : 
getProtectionDomains()) {
+                            if (protectionDomain.getClassLoader()
+                                    instanceof ChildFirstClassLoader) {
+                                leak = true;
+                                break;
+                            }
+                        }
+                        assertThat(leak).isTrue();
+                        return null;
+                    }
+                });
+    }
+
+    @Test
+    void testProtectionDomainDoesNotLeakWithWrappedFileSystemFactory() throws 
Exception {
+        testProtectionDomainLeak(
+                SafeFileSystemFactory.of(
+                        new TestFileSystemFactory() {
+
+                            @Override
+                            public FileSystem create(URI fsUri) {
+                                for (ProtectionDomain protectionDomain : 
getProtectionDomains()) {
+                                    
assertThat(protectionDomain.getCodeSource())
+                                            
.isNotInstanceOf(ChildFirstClassLoader.class);
+                                }
+                                return null;
+                            }
+                        }));
+    }
+
+    private void testProtectionDomainLeak(FileSystemFactory factory) throws 
Exception {
+        final URL testClassLocation =
+                getClass().getProtectionDomain().getCodeSource().getLocation();
+        final ChildFirstClassLoader leakyClassLoader =
+                new ChildFirstClassLoader(
+                        new URL[] {testClassLocation},
+                        Thread.currentThread().getContextClassLoader(),
+                        new String[] {"java."},
+                        error -> {
+                            // No-op.
+                        });
+        final Class<?> clazz =
+                Class.forName(ProtectionDomainLeakTest.class.getName(), true, 
leakyClassLoader);
+        final Constructor<?> declaredConstructor =
+                clazz.getDeclaredConstructor(FileSystemFactory.class);
+        declaredConstructor.setAccessible(true);
+        declaredConstructor.newInstance(factory);
+        // We have to assert class name here, because the class we want to 
assert has been loaded in
+        // a different classloader.

Review comment:
       Initially, I didn't understand this comment here. Not sure whether it 
would help others as well mentioning the implicit fact that the `Class` object 
attached to an instance of the class is actually a new `Class` instance when 
being loaded from a different classloader

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/fs/SafeFileSystemFactory.java
##########
@@ -18,29 +18,85 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FatalExitExceptionHandler;
 import org.apache.flink.util.TemporaryClassLoaderContext;
 import org.apache.flink.util.WrappingProxy;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 /**
- * A wrapper around {@link FileSystemFactory} that ensures the plugin 
classloader is used for all
- * {@link FileSystem} operations.
+ * A wrapper around {@link FileSystemFactory} that ensures the same 
classloader is used for all
+ * {@link FileSystem} operations and that we don't leak the user classloader 
via inherited {@link
+ * java.security.ProtectionDomain protection domains}.
+ *
+ * <h1>Avoiding protection domains leaks
+ *
+ * <p>To understand the protection domain leaks, let's revisit how the 
classloading works.
+ *
+ * <p>Every class that is loaded is assigned with a {@link 
java.security.ProtectionDomain protection
+ * domain}. The {@code protection domain} contains reference to a {@link 
java.security.CodeSource}
+ * (location of the class file + signing certificates), reference to a {@link 
ClassLoader} that has
+ * loaded it, {@link java.security.Principal list of principals} and {@link 
java.security.Permission
+ * permissions}.
+ *
+ * <p>The problematic part is the reference to the {@link ClassLoader} that 
has loaded our class.
+ * The only way to get rid of the reference, would involve re-implementing a 
good part of the low
+ * level ClassLoaders, because the important parts are intentionally marked as 
final.
+ *
+ * <p>Now to the actual problem. There is a thing called {@link 
java.security.AccessControlContext},
+ * which can be obtained by calling {@link 
java.security.AccessController#getContext()}. This
+ * context contains list of the current protection domains.
+ *
+ * <p>In pseudo-code, the list of protection domain is obtained as follows:
+ *
+ * <pre>
+ * {@code getCurrentStackTrace() | getStackTraceElementClass | 
Class#getProtectionDomain | uniq (as in bash)}
+ * </pre>
+ *
+ * <p>When a new thread is spawned it inherits the {@link 
java.security.AccessControlContext} of the
+ * caller, which means that if we spawn any thread that outlives a job from a 
place that has been
+ * called by user code (there is anything loaded by user classloader on 
stack), we'll leak the
+ * reference to user classloader to this newly created thread. This happens 
especially with hadoop
+ * filesystem, where the initialization of the filesystem spawn "cleaner 
threads" that are attached
+ * to the JVM lifecycle.
+ *
+ * <p>The way to work around this, is to spawn initialize filesystems in a 
"safe" thread, that has
+ * no potentially leaky protection domains, that the new thread can inherit.

Review comment:
       Could we add a reference to the ticket here and add the stacktrace shown 
which you linked through one of the other comments 
(https://gist.github.com/dmvk/a77207634cde8094baf456430d1bd92d) in the FLINK 
ticket. That should be sufficient enough and doesn't make the JavaDoc even more 
verbose

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/fs/SafeFileSystemFactory.java
##########
@@ -18,29 +18,85 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FatalExitExceptionHandler;
 import org.apache.flink.util.TemporaryClassLoaderContext;
 import org.apache.flink.util.WrappingProxy;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 /**
- * A wrapper around {@link FileSystemFactory} that ensures the plugin 
classloader is used for all
- * {@link FileSystem} operations.
+ * A wrapper around {@link FileSystemFactory} that ensures the same 
classloader is used for all

Review comment:
       There seems to be some checkstyle issue with this JavaDoc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to