AHeise commented on a change in pull request #16345:
URL: https://github.com/apache/flink/pull/16345#discussion_r662972004



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/MetricUtilsTest.java
##########
@@ -83,16 +81,24 @@ public void 
testStartMetricActorSystemRespectsThreadPriority() throws Exception
         final RpcService rpcService =
                 MetricUtils.startRemoteMetricsRpcService(
                         configuration, "localhost", RpcSystem.load());
-        assertThat(rpcService, instanceOf(AkkaRpcService.class));
-
-        final ActorSystem actorSystem = ((AkkaRpcService) 
rpcService).getActorSystem();
 
         try {
+            // dirty reflection code to avoid ClassCastExceptions
+            final Method getActorSystem = 
rpcService.getClass().getMethod("getActorSystem");

Review comment:
       Here it is: my archenemy reflection. Why can't we use a provided 
dependency of Akka?
   
   Btw I just realized with a second look that this is test only. Please update 
the commit message. Is there something like testProvided in maven?

##########
File path: 
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
##########
@@ -137,6 +138,13 @@ public AkkaRpcService(
 
         captureAskCallstacks = configuration.captureAskCallStack();
 
+        // Akka always sets the threads context class loader to the class 
loader with which it was
+        // loaded (i.e., the plugin class loader)
+        // we must ensure that the context class loader is set to the Flink 
class loader when we
+        // call into Flink
+        // otherwise we could leak the plugin class loader or poison the 
context class leader of
+        // external threads (because they inherit the current threads context 
class loader)
+        internalExecutor = new 
ContextClassLoaderCleaningExecutor(actorSystem.dispatcher());

Review comment:
       The akka submodule usually lives as long as the cluster, right?
   So most of the changes in this commit are more for the MiniCluster, such 
that JVM can outlive the submodule for a specific cluster.
   I'd quickly like to discuss if the benefit outweighs the cost. For the cost, 
it pretty much means that for all RPC calls, we switch the context cl 3 times 
(before call: akka switches, we switch, after call: we switch). Did we ever 
measure the performance of an RPC call and have an idea if this is 
back-and-forth is noticeable? The benefit would be proper isolation in tests. I 
tend to think that it's worth it but I have no idea on how many RPC calls there 
are and how heavy they are beyond this switcheroo.

##########
File path: flink-clients/pom.xml
##########
@@ -47,13 +47,13 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+                       <artifactId>flink-runtime</artifactId>

Review comment:
       🤩 

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
##########
@@ -86,8 +95,28 @@ static RpcSystem load() {
      * @return loaded RpcSystem
      */
     static RpcSystem load(Configuration config) {
-        final ClassLoader classLoader = RpcSystem.class.getClassLoader();
-        return ServiceLoader.load(RpcSystem.class, 
classLoader).iterator().next();
+        try {
+            final ClassLoader classLoader = RpcSystem.class.getClassLoader();
+
+            final String tmpDirectory = 
ConfigurationUtils.parseTempDirectories(config)[0];
+            final Path tempFile = Files.createFile(Paths.get(tmpDirectory, 
"flink-rpc-akka.jar"));
+            IOUtils.copyBytes(
+                    classLoader.getResourceAsStream("flink-rpc-akka.jar"),
+                    Files.newOutputStream(tempFile));
+
+            final PluginLoader pluginLoader =
+                    PluginLoader.create(
+                            new PluginDescriptor(
+                                    "flink-rpc-akka",
+                                    new URL[] {tempFile.toUri().toURL()},
+                                    new String[0]),
+                            classLoader,
+                            
CoreOptions.getPluginParentFirstLoaderPatterns(config));

Review comment:
       Shouldn't we strip down the patterns to the bare minimum? I fear that 
some user change has a big impact.

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java
##########
@@ -177,16 +177,16 @@ public P next() {
                     return resolveIfNeeded(resolve, loadedClass);
                 }
 
-                if (isAllowedFlinkClass(name)) {
-                    try {
+                try {

Review comment:
       We probably should have a separate CL if it doesn't work by default. I 
wonder what the issue is.
   
   The original version, goes to Flink CL for all `o.a.f` classes (=API). Now 
we first load from plugin. I'm assuming if a user screws up and bundles 
`flink-core` etc. (original motivation for parent.first list), all hell breaks 
loose. 
   
   Do we have an issue here because akka RPC also resides in `o.a.f`? It might 
be enough to strip down the parent.first list though... If it doesn't contain 
`o.a.f`, the change probably is unneeded.

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
##########
@@ -74,8 +87,39 @@ RpcServiceBuilder withExecutorConfiguration(
      * @return loaded RpcSystem
      */
     static RpcSystem load() {
-        final ClassLoader classLoader = RpcSystem.class.getClassLoader();
-        return ServiceLoader.load(RpcSystem.class, 
classLoader).iterator().next();
+        return load(new Configuration());
+    }
+
+    /**
+     * Loads the RpcSystem.
+     *
+     * @return loaded RpcSystem
+     */
+    static RpcSystem load(Configuration config) {
+        try {
+            final ClassLoader classLoader = RpcSystem.class.getClassLoader();
+
+            final String tmpDirectory = 
ConfigurationUtils.parseTempDirectories(config)[0];
+            final Path tempFile =
+                    Files.createFile(
+                            Paths.get(tmpDirectory, UUID.randomUUID() + 
"_flink-rpc-akka.jar"));

Review comment:
       From intuition, I'd agree but I have no idea on the effort.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java
##########
@@ -47,22 +47,23 @@
 
     @Test
     public void testRejectionOfEmptyJobGraphs() throws Exception {
-        MiniCluster miniCluster =
-                new MiniCluster(
-                        new MiniClusterConfiguration.Builder()
-                                .setNumTaskManagers(1)
-                                .setNumSlotsPerTaskManager(1)
+        MiniClusterResource miniCluster =

Review comment:
       What's the benefit here?

##########
File path: 
flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcSystem.java
##########
@@ -51,6 +51,9 @@ RpcServiceBuilder remoteServiceBuilder(
             @Nullable String externalAddress,
             String externalPortRange);
 
+    /** Hook to cleanup resources, like common thread pools or classloaders. */
+    void cleanup();

Review comment:
       Could we use closeable? Makes it easier for code analyzes to find leaks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to