tillrohrmann commented on a change in pull request #16345:
URL: https://github.com/apache/flink/pull/16345#discussion_r665694990



##########
File path: 
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
##########
@@ -461,7 +469,7 @@ public void stopServer(RpcServer selfGateway) {
 
     @Override
     public Executor getExecutor() {
-        return actorSystem.dispatcher();
+        return internalExecutor;

Review comment:
       Instead of returning the dispatcher, we could maybe either introduce a 
dedicated executor or remove it altogether. I know that this is not strictly 
related to this PR here.

##########
File path: 
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
##########
@@ -137,6 +138,13 @@ public AkkaRpcService(
 
         captureAskCallstacks = configuration.captureAskCallStack();
 
+        // Akka always sets the threads context class loader to the class 
loader with which it was
+        // loaded (i.e., the plugin class loader)
+        // we must ensure that the context class loader is set to the Flink 
class loader when we
+        // call into Flink
+        // otherwise we could leak the plugin class loader or poison the 
context class leader of

Review comment:
       typo: context class loader

##########
File path: 
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
##########
@@ -480,14 +488,13 @@ public ScheduledExecutor getScheduledExecutor() {
 
     @Override
     public void execute(Runnable runnable) {
-        actorSystem.dispatcher().execute(runnable);
+        internalExecutor.execute(runnable);

Review comment:
       I'd be in favour of getting rid of this method.

##########
File path: 
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
##########
@@ -144,12 +147,21 @@ public void postStop() throws Exception {
     @Override
     public Receive createReceive() {
         return ReceiveBuilder.create()
-                .match(RemoteHandshakeMessage.class, 
this::handleHandshakeMessage)
-                .match(ControlMessages.class, this::handleControlMessage)
-                .matchAny(this::handleMessage)
+                .match(
+                        RemoteHandshakeMessage.class,
+                        
withCleanContextClassLoader(this::handleHandshakeMessage))
+                .match(
+                        ControlMessages.class,
+                        
withCleanContextClassLoader(this::handleControlMessage))
+                .matchAny(withCleanContextClassLoader(this::handleMessage))

Review comment:
       Why do we need this exactly?

##########
File path: 
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
##########
@@ -229,22 +230,29 @@ private Object invokeRpc(Method method, Object[] args) 
throws Exception {
 
             final CompletableFuture<Object> completableFuture = new 
CompletableFuture<>();
             resultFuture.whenComplete(
-                    (resultValue, failure) -> {
-                        if (failure != null) {
-                            completableFuture.completeExceptionally(
-                                    resolveTimeoutException(failure, 
callStackCapture, method));
-                        } else {
-                            completableFuture.complete(
-                                    deserializeValueIfNeeded(resultValue, 
method));
-                        }
-                    });
+                    (resultValue, failure) ->
+                            ClassLoadingUtils.runWithFlinkContextClassLoader(
+                                    () -> {
+                                        if (failure != null) {
+                                            
completableFuture.completeExceptionally(
+                                                    resolveTimeoutException(
+                                                            failure, 
callStackCapture, method));
+                                        } else {
+                                            completableFuture.complete(
+                                                    
deserializeValueIfNeeded(resultValue, method));
+                                        }
+                                    }));
 
             if (Objects.equals(returnType, CompletableFuture.class)) {
                 result = completableFuture;
             } else {
                 try {
                     result =
-                            completableFuture.get(futureTimeout.getSize(), 
futureTimeout.getUnit());
+                            ClassLoadingUtils.runWithFlinkContextClassLoader(
+                                    () ->
+                                            completableFuture.get(
+                                                    futureTimeout.getSize(),
+                                                    futureTimeout.getUnit()));

Review comment:
       Why is this needed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to