tillrohrmann commented on a change in pull request #16345:
URL: https://github.com/apache/flink/pull/16345#discussion_r666109090
##########
File path:
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
##########
@@ -332,28 +333,33 @@ public AkkaRpcService createAndStart(
final ActorSystem actorSystem;
- if (externalAddress == null) {
- // create local actor system
- actorSystem =
- AkkaBootstrapTools.startLocalActorSystem(
- configuration,
- actorSystemName,
- logger,
- actorSystemExecutorConfiguration,
- customConfig);
- } else {
- // create remote actor system
- actorSystem =
- AkkaBootstrapTools.startRemoteActorSystem(
- configuration,
- actorSystemName,
- externalAddress,
- externalPortRange,
- bindAddress,
- Optional.ofNullable(bindPort),
- logger,
- actorSystemExecutorConfiguration,
- customConfig);
+ // akka internally caches the context class loader
+ // make sure it uses the plugin class loader
+ try (TemporaryClassLoaderContext ignored =
+
TemporaryClassLoaderContext.of(getClass().getClassLoader())) {
Review comment:
Maybe this is something which could happen at the `Flink <> RpcSystem`
boundary. Once we enter the `RpcSystem`, we set the correct class loader?
##########
File path: flink-runtime/pom.xml
##########
@@ -304,6 +316,33 @@ under the License.
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-rpc-akka-jars</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+
<groupId>org.apache.flink</groupId>
+
<artifactId>flink-rpc-akka</artifactId>
+
<version>${project.version}</version>
+
<type>jar</type>
+
<overWrite>true</overWrite>
+
<destFileName>flink-rpc-akka.jar</destFileName>
+ </artifactItem>
+ </artifactItems>
+
<outputDirectory>${project.build.directory}/classes</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
Review comment:
If we make the `AkkaRpcService` loading the responsibility of
`flink-rpc-akka`, then we could remove this part.
##########
File path:
flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.classloading;
+
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.Iterator;
+
+/**
+ * A {@link URLClassLoader} that restricts which classes can be loaded to
those contained within the
+ * given classpath, except classes from a given set of packages that are
either loaded parent or
+ * child-first.
+ */
+public class ComponentClassLoader extends URLClassLoader {
+ private static final ClassLoader PLATFORM_OR_BOOTSTRAP_LOADER;
+
+ private final ClassLoader parentClassLoader;
+
+ private final String[] parentFirstPackages;
+ private final String[] childFirstPackages;
+ private final String[] parentFirstResourcePrefixes;
+ private final String[] childFirstResourcePrefixes;
+
+ public ComponentClassLoader(
+ URL[] classpath,
+ ClassLoader parentClassLoader,
Review comment:
Is this really the parent class loader? The constructor looks as if
`PLATFORM_OR_BOOTSTRAP_LOADER` is the parent class loader.
##########
File path:
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
##########
@@ -86,8 +95,28 @@ static RpcSystem load() {
* @return loaded RpcSystem
*/
static RpcSystem load(Configuration config) {
- final ClassLoader classLoader = RpcSystem.class.getClassLoader();
- return ServiceLoader.load(RpcSystem.class,
classLoader).iterator().next();
+ try {
+ final ClassLoader classLoader = RpcSystem.class.getClassLoader();
+
+ final String tmpDirectory =
ConfigurationUtils.parseTempDirectories(config)[0];
+ final Path tempFile = Files.createFile(Paths.get(tmpDirectory,
"flink-rpc-akka.jar"));
+ IOUtils.copyBytes(
+ classLoader.getResourceAsStream("flink-rpc-akka.jar"),
+ Files.newOutputStream(tempFile));
+
+ final PluginLoader pluginLoader =
+ PluginLoader.create(
+ new PluginDescriptor(
+ "flink-rpc-akka",
+ new URL[] {tempFile.toUri().toURL()},
+ new String[0]),
+ classLoader,
+
CoreOptions.getPluginParentFirstLoaderPatterns(config));
+ return new PluginLoaderClosingRpcSystem(
+ pluginLoader.load(RpcSystem.class).next(), pluginLoader);
Review comment:
Should we make this specific loading of `flink-rpc-akka` an
implementation detail of `flink-rpc-akka`? We could have a `RpcSystem`
implementation in `flink-rpc-akka.jar` that extracts the `akka-rpc.jar` that
contains the specific `AkkaRpcSystem`. That way make `flink-rpc` independent of
the specific implementation (e.g. there might be `RpcSystem` implementations
which don't have to use a dedicated `SubmoduleClassLoader`.
##########
File path:
flink-core/src/main/java/org/apache/flink/core/classloading/ComponentClassLoader.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.classloading;
+
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.Iterator;
+
+/**
+ * A {@link URLClassLoader} that restricts which classes can be loaded to
those contained within the
+ * given classpath, except classes from a given set of packages that are
either loaded parent or
+ * child-first.
+ */
+public class ComponentClassLoader extends URLClassLoader {
+ private static final ClassLoader PLATFORM_OR_BOOTSTRAP_LOADER;
+
+ private final ClassLoader parentClassLoader;
+
+ private final String[] parentFirstPackages;
+ private final String[] childFirstPackages;
Review comment:
I am not sure whether we should call these fields parent and child
because it looks a bit more like two independent class loaders (Flink and
plugin, left and right, owner and component).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]