tillrohrmann commented on a change in pull request #16345:
URL: https://github.com/apache/flink/pull/16345#discussion_r665694990
##########
File path:
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
##########
@@ -461,7 +469,7 @@ public void stopServer(RpcServer selfGateway) {
@Override
public Executor getExecutor() {
- return actorSystem.dispatcher();
+ return internalExecutor;
Review comment:
Instead of returning the dispatcher, we could maybe either introduce a
dedicated executor or remove it altogether. I know that this is not strictly
related to this PR here.
##########
File path:
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
##########
@@ -137,6 +138,13 @@ public AkkaRpcService(
captureAskCallstacks = configuration.captureAskCallStack();
+ // Akka always sets the threads context class loader to the class
loader with which it was
+ // loaded (i.e., the plugin class loader)
+ // we must ensure that the context class loader is set to the Flink
class loader when we
+ // call into Flink
+ // otherwise we could leak the plugin class loader or poison the
context class leader of
Review comment:
typo: context class loader
##########
File path:
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
##########
@@ -480,14 +488,13 @@ public ScheduledExecutor getScheduledExecutor() {
@Override
public void execute(Runnable runnable) {
- actorSystem.dispatcher().execute(runnable);
+ internalExecutor.execute(runnable);
Review comment:
I'd be in favour of getting rid of this method.
##########
File path:
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
##########
@@ -144,12 +147,21 @@ public void postStop() throws Exception {
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
- .match(RemoteHandshakeMessage.class,
this::handleHandshakeMessage)
- .match(ControlMessages.class, this::handleControlMessage)
- .matchAny(this::handleMessage)
+ .match(
+ RemoteHandshakeMessage.class,
+
withCleanContextClassLoader(this::handleHandshakeMessage))
+ .match(
+ ControlMessages.class,
+
withCleanContextClassLoader(this::handleControlMessage))
+ .matchAny(withCleanContextClassLoader(this::handleMessage))
Review comment:
Why do we need this exactly?
##########
File path:
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
##########
@@ -229,22 +230,29 @@ private Object invokeRpc(Method method, Object[] args)
throws Exception {
final CompletableFuture<Object> completableFuture = new
CompletableFuture<>();
resultFuture.whenComplete(
- (resultValue, failure) -> {
- if (failure != null) {
- completableFuture.completeExceptionally(
- resolveTimeoutException(failure,
callStackCapture, method));
- } else {
- completableFuture.complete(
- deserializeValueIfNeeded(resultValue,
method));
- }
- });
+ (resultValue, failure) ->
+ ClassLoadingUtils.runWithFlinkContextClassLoader(
+ () -> {
+ if (failure != null) {
+
completableFuture.completeExceptionally(
+ resolveTimeoutException(
+ failure,
callStackCapture, method));
+ } else {
+ completableFuture.complete(
+
deserializeValueIfNeeded(resultValue, method));
+ }
+ }));
if (Objects.equals(returnType, CompletableFuture.class)) {
result = completableFuture;
} else {
try {
result =
- completableFuture.get(futureTimeout.getSize(),
futureTimeout.getUnit());
+ ClassLoadingUtils.runWithFlinkContextClassLoader(
+ () ->
+ completableFuture.get(
+ futureTimeout.getSize(),
+ futureTimeout.getUnit()));
Review comment:
Why is this needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]