tillrohrmann commented on a change in pull request #16345: URL: https://github.com/apache/flink/pull/16345#discussion_r668568728
########## File path: flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.classloading; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.function.FunctionWithException; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.Iterator; + +/** + * A {@link URLClassLoader} that restricts which classes can be loaded to those contained within the + * given classpath, except classes from a given set of packages that are either loaded owner or + * component-first. + * + * <p>Depiction of the class loader hierarchy: + * + * <pre> + * Owner Bootstrap + * ^ ^ + * |---------| + * | + * Component + * </pre> + * + * <p>For loading classes/resources, class loaders are accessed in one of the following orders: + * + * <ul> + * <li>component-only: component -> bootstrap; default. + * <li>component-first: component -> bootstrap -> owner; opt-in. + * <li>owner-first: owner -> component -> bootstrap; opt-in. + * </ul> + */ +public class ComponentClassLoader extends URLClassLoader { + private static final ClassLoader PLATFORM_OR_BOOTSTRAP_LOADER; + + private final ClassLoader ownerClassLoader; + + private final String[] ownerFirstPackages; + private final String[] componentFirstPackages; + private final String[] ownerFirstResourcePrefixes; + private final String[] componentFirstResourcePrefixes; + + public ComponentClassLoader( + URL[] classpath, + ClassLoader ownerClassLoader, + String[] ownerFirstPackages, + String[] componentFirstPackages) { + super(classpath, PLATFORM_OR_BOOTSTRAP_LOADER); + this.ownerClassLoader = ownerClassLoader; + + this.ownerFirstPackages = ownerFirstPackages; + this.componentFirstPackages = componentFirstPackages; + + ownerFirstResourcePrefixes = convertPackagePrefixesToPathPrefixes(ownerFirstPackages); + componentFirstResourcePrefixes = + convertPackagePrefixesToPathPrefixes(componentFirstPackages); + } + + // ---------------------------------------------------------------------------------------------- + // Class loading + // ---------------------------------------------------------------------------------------------- + + @Override + protected Class<?> loadClass(final String name, final boolean resolve) + throws ClassNotFoundException { + synchronized (getClassLoadingLock(name)) { + final Class<?> loadedClass = findLoadedClass(name); + if (loadedClass != null) { + return resolveIfNeeded(resolve, loadedClass); + } + + if (isComponentFirstClass(name)) { + return loadClassFromComponentFirst(name, resolve); + } + if (isOwnerFirstClass(name)) { + return loadClassFromOwnerFirst(name, resolve); + } + + // making this behavior configurable (component-only/component-first/owner-first) would + // allow + // this class to subsume the + // FlinkUserCodeClassLoader (with an added exception handler) Review comment: Formatting seems a bit off. ########## File path: flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcSystemLoader.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.core.classloading.SubmoduleClassLoader; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.runtime.rpc.RpcSystemLoader; +import org.apache.flink.util.IOUtils; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ServiceLoader; +import java.util.UUID; + +/** + * Loader for the {@link AkkaRpcSystemLoader}. + * + * <p>This loader expects the flink-rpc-akka jar to be accessible via {@link + * ClassLoader#getResource(String)}. It will extract the jar into a temporary directory and create a + * new {@link SubmoduleClassLoader} to load the rpc system from that jar. + */ +public class AkkaRpcSystemLoader implements RpcSystemLoader { + + @Override + public RpcSystem loadRpcSystem(Configuration config) { + try { + final ClassLoader flinkClassLoader = RpcSystem.class.getClassLoader(); + + final String tmpDirectory = ConfigurationUtils.parseTempDirectories(config)[0]; + final Path tempFile = + Files.createFile( + Paths.get(tmpDirectory, UUID.randomUUID() + "_flink-rpc-akka.jar")); Review comment: I am wondering whether `flink-rpc-akka_UUID.jar` would make it easier to find these files in the temp directory. Any specific reason why the UUID is the prefix? ########## File path: flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java ########## @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.FutureUtils; + +import akka.actor.ActorSystem; +import akka.actor.Terminated; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader; +import static org.hamcrest.CoreMatchers.either; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +/** + * Tests the context class loader handling in various parts of the akka rpc system. + * + * <p>The tests check cases where we call from the akka rpc system into Flink, in which case the + * context class loader must be set to the Flink class loader. This ensures that the Akka class + * loader does not get accidentally leaked, e.g., via thread locals or thread pools on the Flink + * side. + */ +public class ContextClassLoadingSettingTest extends TestLogger { + + private static final Time TIMEOUT = Time.milliseconds(10000L); + + // Many of the contained tests assert that a future is completed with a specific context class + // loader by applying a synchronous operation. + // If the initial future is completed by the time we apply the synchronous operation the test + // thread will execute the operation instead. The tests are thus susceptible to timing issues. + // We hence take a probabilistic approach: Assume that this timing is rare, guard these calls in + // the test with a temporary class loader context, and assert that the actually used + // context class loader is _either_ the one we truly expect or the temporary one. + private static final ClassLoader testClassLoader = + new URLClassLoader(new URL[0], ContextClassLoadingSettingTest.class.getClassLoader()); + + private ClassLoader pretendFlinkClassLoader; + private ActorSystem actorSystem; + private AkkaRpcService akkaRpcService; + + @Before + public void setup() { + pretendFlinkClassLoader = + new URLClassLoader( + new URL[0], ContextClassLoadingSettingTest.class.getClassLoader()); + actorSystem = AkkaUtils.createDefaultActorSystem(); + akkaRpcService = + new AkkaRpcService( + actorSystem, + AkkaRpcServiceConfiguration.defaultConfiguration(), + pretendFlinkClassLoader); + } + + @After Review comment: Ok, I see, we stop it in a test. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ########## @@ -1069,7 +1069,7 @@ private void terminateMiniClusterServices() throws Exception { } try { - rpcSystem.cleanup(); + rpcSystem.close(); Review comment: I like `Closer`. We don't have to change it for this PR but consider it for a potential improvement in the future. ########## File path: flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java ########## @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.FutureUtils; + +import akka.actor.ActorSystem; +import akka.actor.Terminated; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader; +import static org.hamcrest.CoreMatchers.either; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +/** + * Tests the context class loader handling in various parts of the akka rpc system. + * + * <p>The tests check cases where we call from the akka rpc system into Flink, in which case the + * context class loader must be set to the Flink class loader. This ensures that the Akka class + * loader does not get accidentally leaked, e.g., via thread locals or thread pools on the Flink + * side. + */ +public class ContextClassLoadingSettingTest extends TestLogger { + + private static final Time TIMEOUT = Time.milliseconds(10000L); + + // Many of the contained tests assert that a future is completed with a specific context class + // loader by applying a synchronous operation. + // If the initial future is completed by the time we apply the synchronous operation the test + // thread will execute the operation instead. The tests are thus susceptible to timing issues. + // We hence take a probabilistic approach: Assume that this timing is rare, guard these calls in + // the test with a temporary class loader context, and assert that the actually used + // context class loader is _either_ the one we truly expect or the temporary one. + private static final ClassLoader testClassLoader = + new URLClassLoader(new URL[0], ContextClassLoadingSettingTest.class.getClassLoader()); + + private ClassLoader pretendFlinkClassLoader; + private ActorSystem actorSystem; + private AkkaRpcService akkaRpcService; + + @Before + public void setup() { + pretendFlinkClassLoader = + new URLClassLoader( + new URL[0], ContextClassLoadingSettingTest.class.getClassLoader()); + actorSystem = AkkaUtils.createDefaultActorSystem(); + akkaRpcService = + new AkkaRpcService( + actorSystem, + AkkaRpcServiceConfiguration.defaultConfiguration(), + pretendFlinkClassLoader); + } + + @After Review comment: Is it required that we set up the `AkkaRpcService` for each test or could it be reused? ########## File path: flink-core/src/test/java/org/apache/flink/core/classloading/ComponentClassLoaderTest.java ########## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.classloading; + +import org.apache.flink.util.TestLogger; + +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Enumeration; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +/** Tests for the {@link ComponentClassLoader}. */ +public class ComponentClassLoaderTest extends TestLogger { + + private static final String NON_EXISTENT_CLASS_NAME = "foo.Bar"; + private static final Class<?> CLASS_TO_LOAD = Class.class; + private static final Class<?> CLASS_RETURNED_BY_OWNER = ComponentClassLoaderTest.class; + + private static final String NON_EXISTENT_RESOURCE_NAME = "foo/Bar"; + private static String resourceToLoad; + private static final URL RESOURCE_RETURNED_BY_OWNER = createURL(); + + @ClassRule public static final TemporaryFolder TMP = new TemporaryFolder(); + + @BeforeClass + public static void setup() throws IOException { + resourceToLoad = TMP.newFile("tmpfile").getName(); + } + + // ---------------------------------------------------------------------------------------------- + // Class loading + // ---------------------------------------------------------------------------------------------- + + @Test(expected = ClassNotFoundException.class) + public void testComponentOnlyIsDefaultForClasses() throws Exception { + TestUrlClassLoader owner = + new TestUrlClassLoader(NON_EXISTENT_CLASS_NAME, CLASS_RETURNED_BY_OWNER); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader(new URL[0], owner, new String[0], new String[0]); + + componentClassLoader.loadClass(NON_EXISTENT_CLASS_NAME); + } + + @Test + public void testOwnerFirstClassFoundIgnoresComponent() throws Exception { + TestUrlClassLoader owner = + new TestUrlClassLoader(CLASS_TO_LOAD.getName(), CLASS_RETURNED_BY_OWNER); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[0], owner, new String[] {CLASS_TO_LOAD.getName()}, new String[0]); + + final Class<?> loadedClass = componentClassLoader.loadClass(CLASS_TO_LOAD.getName()); + assertThat(loadedClass, sameInstance(CLASS_RETURNED_BY_OWNER)); + } + + @Test + public void testOwnerFirstClassNotFoundFallsBackToComponent() throws Exception { + TestUrlClassLoader owner = new TestUrlClassLoader(); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[0], owner, new String[] {CLASS_TO_LOAD.getName()}, new String[0]); + + final Class<?> loadedClass = componentClassLoader.loadClass(CLASS_TO_LOAD.getName()); + assertThat(loadedClass, sameInstance(CLASS_TO_LOAD)); + } + + @Test + public void testComponentFirstClassFoundIgnoresOwner() throws Exception { + TestUrlClassLoader owner = + new TestUrlClassLoader(CLASS_TO_LOAD.getName(), CLASS_RETURNED_BY_OWNER); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[0], owner, new String[0], new String[] {CLASS_TO_LOAD.getName()}); + + final Class<?> loadedClass = componentClassLoader.loadClass(CLASS_TO_LOAD.getName()); + assertThat(loadedClass, sameInstance(CLASS_TO_LOAD)); + } + + @Test + public void testComponentFirstClassNotFoundFallsBackToOwner() throws Exception { + TestUrlClassLoader owner = + new TestUrlClassLoader(NON_EXISTENT_CLASS_NAME, CLASS_RETURNED_BY_OWNER); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[0], owner, new String[0], new String[] {NON_EXISTENT_CLASS_NAME}); + + final Class<?> loadedClass = componentClassLoader.loadClass(NON_EXISTENT_CLASS_NAME); + assertThat(loadedClass, sameInstance(CLASS_RETURNED_BY_OWNER)); + } + + // ---------------------------------------------------------------------------------------------- + // Resource loading + // ---------------------------------------------------------------------------------------------- + + @Test + public void testComponentOnlyIsDefaultForResources() throws IOException { + TestUrlClassLoader owner = new TestUrlClassLoader(); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader(new URL[0], owner, new String[0], new String[0]); + + assertThat(componentClassLoader.getResource(NON_EXISTENT_RESOURCE_NAME), nullValue()); + assertThat( + componentClassLoader.getResources(NON_EXISTENT_RESOURCE_NAME).hasMoreElements(), + is(false)); + } + + @Test + public void testOwnerFirstResourceFoundIgnoresComponent() { + TestUrlClassLoader owner = + new TestUrlClassLoader(resourceToLoad, RESOURCE_RETURNED_BY_OWNER); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[] {}, owner, new String[] {resourceToLoad}, new String[0]); + + final URL loadedResource = componentClassLoader.getResource(resourceToLoad); + assertThat(loadedResource, sameInstance(RESOURCE_RETURNED_BY_OWNER)); + } + + @Test + public void testOwnerFirstResourceNotFoundFallsBackToComponent() throws Exception { + TestUrlClassLoader owner = new TestUrlClassLoader(); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[] {TMP.getRoot().toURI().toURL()}, + owner, + new String[] {resourceToLoad}, + new String[0]); + + final URL loadedResource = componentClassLoader.getResource(resourceToLoad); + assertThat(loadedResource.toString(), containsString(resourceToLoad)); + } + + @Test + public void testComponentFirstResourceFoundIgnoresOwner() throws Exception { + TestUrlClassLoader owner = + new TestUrlClassLoader(resourceToLoad, RESOURCE_RETURNED_BY_OWNER); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader( + new URL[] {TMP.getRoot().toURI().toURL()}, + owner, + new String[0], + new String[] {resourceToLoad}); + + final URL loadedResource = componentClassLoader.getResource(resourceToLoad); + assertThat(loadedResource.toString(), containsString(resourceToLoad)); + } + + @Test + public void testComponentFirstResourceNotFoundFallsBackToOwner() { + TestUrlClassLoader owner = + new TestUrlClassLoader(NON_EXISTENT_RESOURCE_NAME, RESOURCE_RETURNED_BY_OWNER); + + final ComponentClassLoader componentClassLoader = + new ComponentClassLoader(new URL[0], owner, new String[0], new String[] {"foo"}); Review comment: Why is it here `{"foo"}` and not `{resourceToLoad}`? ########## File path: flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/ContextClassLoadingSettingTest.java ########## @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.FutureUtils; + +import akka.actor.ActorSystem; +import akka.actor.Terminated; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader; +import static org.hamcrest.CoreMatchers.either; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +/** + * Tests the context class loader handling in various parts of the akka rpc system. + * + * <p>The tests check cases where we call from the akka rpc system into Flink, in which case the + * context class loader must be set to the Flink class loader. This ensures that the Akka class + * loader does not get accidentally leaked, e.g., via thread locals or thread pools on the Flink + * side. + */ +public class ContextClassLoadingSettingTest extends TestLogger { + + private static final Time TIMEOUT = Time.milliseconds(10000L); + + // Many of the contained tests assert that a future is completed with a specific context class + // loader by applying a synchronous operation. + // If the initial future is completed by the time we apply the synchronous operation the test + // thread will execute the operation instead. The tests are thus susceptible to timing issues. + // We hence take a probabilistic approach: Assume that this timing is rare, guard these calls in + // the test with a temporary class loader context, and assert that the actually used + // context class loader is _either_ the one we truly expect or the temporary one. + private static final ClassLoader testClassLoader = + new URLClassLoader(new URL[0], ContextClassLoadingSettingTest.class.getClassLoader()); + + private ClassLoader pretendFlinkClassLoader; + private ActorSystem actorSystem; + private AkkaRpcService akkaRpcService; + + @Before + public void setup() { + pretendFlinkClassLoader = + new URLClassLoader( + new URL[0], ContextClassLoadingSettingTest.class.getClassLoader()); + actorSystem = AkkaUtils.createDefaultActorSystem(); + akkaRpcService = + new AkkaRpcService( + actorSystem, + AkkaRpcServiceConfiguration.defaultConfiguration(), + pretendFlinkClassLoader); + } + + @After + public void shutdown() throws InterruptedException, ExecutionException, TimeoutException { + final CompletableFuture<Void> rpcTerminationFuture = akkaRpcService.stopService(); + final CompletableFuture<Terminated> actorSystemTerminationFuture = + AkkaFutureUtils.toJava(actorSystem.terminate()); + + FutureUtils.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)) + .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + + actorSystem = null; + akkaRpcService = null; + } + + @Test + public void testAkkaRpcService_ExecuteRunnableSetsFlinkContextClassLoader() + throws ExecutionException, InterruptedException { + final CompletableFuture<ClassLoader> contextClassLoader = new CompletableFuture<>(); + akkaRpcService.execute( + () -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader())); + assertIsFlinkClassLoader(contextClassLoader.get()); + } + + @Test + public void testAkkaRpcService_ExecuteCallableSetsFlinkContextClassLoader() + throws ExecutionException, InterruptedException { + final CompletableFuture<ClassLoader> contextClassLoader = + akkaRpcService.execute(() -> Thread.currentThread().getContextClassLoader()); + assertIsFlinkClassLoader(contextClassLoader.get()); + } + + @Test + public void testAkkaRpcService_ExecuteCallableResultCompletedWithFlinkContextClassLoader() + throws ExecutionException, InterruptedException { + + final CompletableFuture<Void> blocker = new CompletableFuture<>(); + + final CompletableFuture<ClassLoader> contextClassLoader = + runWithContextClassLoader( + () -> + akkaRpcService + .execute((Callable<Void>) blocker::get) + .thenApply( + ignored -> + Thread.currentThread() + .getContextClassLoader()), + testClassLoader); + blocker.complete(null); + + assertIsFlinkClassLoader(contextClassLoader.get()); + } + + @Test + public void testAkkaRpcService_ScheduleSetsFlinkContextClassLoader() + throws ExecutionException, InterruptedException { + final CompletableFuture<ClassLoader> contextClassLoader = new CompletableFuture<>(); + akkaRpcService.scheduleRunnable( + () -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader()), + 5, + TimeUnit.MILLISECONDS); + assertThat(contextClassLoader.get(), is(pretendFlinkClassLoader)); + } + + @Test + public void testAkkaRpcService_ConnectFutureCompletedWithFlinkContextClassLoader() + throws Exception { + try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) { + + final ClassLoader contextClassLoader = + runWithContextClassLoader( + () -> + akkaRpcService + .connect( + testEndpoint.getAddress(), + TestEndpointGateway.class) + .thenApply( + ignored -> + Thread.currentThread() + .getContextClassLoader()) + .get(), + testClassLoader); + assertIsFlinkClassLoader(contextClassLoader); + } + } + + @Test + public void testAkkaRpcService_TerminationFutureCompletedWithFlinkContextClassLoader() + throws Exception { + final ClassLoader contextClassLoader = + runWithContextClassLoader( + () -> + akkaRpcService + .stopService() + .thenApply( + ignored -> + Thread.currentThread() + .getContextClassLoader()) + .get(), + testClassLoader); + + assertIsFlinkClassLoader(contextClassLoader); + } + + @Test + public void testAkkaRpcActor_OnStartCalledWithFlinkContextClassLoader() throws Exception { + try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) { + testEndpoint.start(); + assertIsFlinkClassLoader(testEndpoint.onStartClassLoader.get()); + } + } + + @Test + public void testAkkaRpcActor_OnStopCalledWithFlinkContextClassLoader() throws Exception { + final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService); + testEndpoint.start(); + testEndpoint.close(); + + assertIsFlinkClassLoader(testEndpoint.onStopClassLoader.get()); + } + + @Test + public void testAkkaRpcActor_CallAsyncCalledWithFlinkContextClassLoader() throws Exception { + try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) { + testEndpoint.start(); + + final CompletableFuture<ClassLoader> contextClassLoader = testEndpoint.doCallAsync(); + assertIsFlinkClassLoader(contextClassLoader.get()); + } + } + + @Test + public void testAkkaRpcActor_RunAsyncCalledWithFlinkContextClassLoader() throws Exception { + try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) { + testEndpoint.start(); + + final CompletableFuture<ClassLoader> contextClassLoader = testEndpoint.doRunAsync(); + assertIsFlinkClassLoader(contextClassLoader.get()); + } + } + + @Test + public void testAkkaRpcActor_RPCReturningVoidCalledWithFlinkContextClassLoader() + throws Exception { + try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) { + testEndpoint.start(); + + final TestEndpointGateway testEndpointGateway = + akkaRpcService + .connect(testEndpoint.getAddress(), TestEndpointGateway.class) + .get(); + testEndpointGateway.doSomethingWithoutReturningAnything(); + + assertIsFlinkClassLoader(testEndpoint.voidOperationClassLoader.get()); + } + } + + @Test + public void testAkkaRpcActor_RPCCalledWithFlinkContextClassLoader() throws Exception { + try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) { + testEndpoint.start(); + + final TestEndpointGateway testEndpointGateway = + akkaRpcService + .connect(testEndpoint.getAddress(), TestEndpointGateway.class) + .get(); + final ClassLoader contextClassLoader = + testEndpointGateway.getContextClassLoader().get(); + assertIsFlinkClassLoader(contextClassLoader); + } + } + + @Test + public void testAkkaRpcInvocationHandler_RPCFutureCompletedWithFlinkContextClassLoader() + throws Exception { + try (final TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService)) { + testEndpoint.start(); + + final TestEndpointGateway testEndpointGateway = + akkaRpcService + .connect(testEndpoint.getAddress(), TestEndpointGateway.class) + .get(); + final CompletableFuture<ClassLoader> contextClassLoader = + runWithContextClassLoader( + () -> + testEndpointGateway + .doSomethingAsync() + .thenApply( + ignored -> + Thread.currentThread() + .getContextClassLoader()), + testClassLoader); + testEndpoint.completeRPCFuture(); + + assertIsFlinkClassLoader(contextClassLoader.get()); + } + } + + @Test + public void testSupervisorActor_TerminationFutureCompletedWithWithFlinkContextClassLoader() Review comment: `TerminationFutureCompletedWithFlinkContextClassLoader` ########## File path: flink-rpc/flink-rpc-akka-loader/src/main/java/org/apache/flink/runtime/rpc/akka/CleanupOnCloseRpcSystem.java ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.classloading.SubmoduleClassLoader; +import org.apache.flink.runtime.rpc.AddressResolution; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.nio.file.Files; +import java.nio.file.Path; + +/** An {@link RpcSystem} wrapper that cleans up resources after the RPC system has been closed. */ +class CleanupOnCloseRpcSystem implements RpcSystem { + private static final Logger LOG = LoggerFactory.getLogger(CleanupOnCloseRpcSystem.class); + + private final RpcSystem rpcSystem; + private final SubmoduleClassLoader pluginLoader; + private final Path tempFile; + + public CleanupOnCloseRpcSystem( + RpcSystem rpcSystem, SubmoduleClassLoader pluginLoader, Path tempFile) { + this.rpcSystem = Preconditions.checkNotNull(rpcSystem); + this.pluginLoader = Preconditions.checkNotNull(pluginLoader); + this.tempFile = Preconditions.checkNotNull(tempFile); + } + + @Override + public void close() { + rpcSystem.close(); + try { + pluginLoader.close(); + } catch (IOException e) { + LOG.warn("Could not close RpcSystem classloader.", e); + } + try { + Files.delete(tempFile); + } catch (IOException e) { + LOG.warn("Could not delete temporary rpc system file {}.", tempFile, e); + } Review comment: I am wondering whether we shouldn't catch `Exception` in order to also catch `RuntimeException` during closing and continue with the clean up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
