This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f2ef60c4fd5470995c4acd6e6f96c6cd6bde84c3 Author: Chesnay Schepler <[email protected]> AuthorDate: Tue Jun 7 10:51:08 2022 +0200 [FLINK-27933][coordination][tests] Force serialization applies to return value --- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 22 ++++++++++++++++------ .../flink/runtime/rpc/akka/AkkaRpcService.java | 1 + .../flink/runtime/rpc/akka/FencedAkkaRpcActor.java | 9 ++++++++- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 9990033c2c1..8029956bbd9 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rpc.akka; +import org.apache.flink.runtime.rpc.Local; import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; @@ -100,6 +101,8 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { private final AtomicBoolean rpcEndpointStopped; + private final boolean forceSerialization; + private volatile RpcEndpointTerminationResult rpcEndpointTerminationResult; @Nonnull private State state; @@ -109,11 +112,13 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { final CompletableFuture<Boolean> terminationFuture, final int version, final long maximumFramesize, + final boolean forceSerialization, final ClassLoader flinkClassLoader) { checkArgument(maximumFramesize > 0, "Maximum framesize must be positive."); this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint"); this.flinkClassLoader = checkNotNull(flinkClassLoader); + this.forceSerialization = forceSerialization; this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint); this.terminationFuture = checkNotNull(terminationFuture); this.version = version; @@ -314,12 +319,14 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { } final String methodName = rpcMethod.getName(); + final boolean isLocalRpcInvocation = + rpcMethod.getAnnotation(Local.class) != null; if (result instanceof CompletableFuture) { final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result; - sendAsyncResponse(responseFuture, methodName); + sendAsyncResponse(responseFuture, methodName, isLocalRpcInvocation); } else { - sendSyncResponse(result, methodName); + sendSyncResponse(result, methodName, isLocalRpcInvocation); } } } catch (Throwable e) { @@ -330,8 +337,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { } } - private void sendSyncResponse(Object response, String methodName) { - if (isRemoteSender(getSender())) { + private void sendSyncResponse( + Object response, String methodName, boolean isLocalRpcInvocation) { + if (isRemoteSender(getSender()) || (forceSerialization && !isLocalRpcInvocation)) { Either<AkkaRpcSerializedValue, AkkaRpcException> serializedResult = serializeRemoteResultAndVerifySize(response, methodName); @@ -345,7 +353,8 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { } } - private void sendAsyncResponse(CompletableFuture<?> asyncResponse, String methodName) { + private void sendAsyncResponse( + CompletableFuture<?> asyncResponse, String methodName, boolean isLocalRpcInvocation) { final ActorRef sender = getSender(); Promise.DefaultPromise<Object> promise = new Promise.DefaultPromise<>(); @@ -355,7 +364,8 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor { if (throwable != null) { promise.failure(throwable); } else { - if (isRemoteSender(sender)) { + if (isRemoteSender(sender) + || (forceSerialization && !isLocalRpcInvocation)) { Either<AkkaRpcSerializedValue, AkkaRpcException> serializedResult = serializeRemoteResultAndVerifySize( diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 91a438242fe..e5a20c3dc07 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -349,6 +349,7 @@ public class AkkaRpcService implements RpcService { actorTerminationFuture, getVersion(), configuration.getMaximumFramesize(), + configuration.isForceRpcInvocationSerialization(), flinkClassLoader), rpcEndpoint.getEndpointId()); diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java index af901b02542..8e034e696b4 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java @@ -45,8 +45,15 @@ public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpo CompletableFuture<Boolean> terminationFuture, int version, final long maximumFramesize, + final boolean forceSerialization, ClassLoader flinkClassLoader) { - super(rpcEndpoint, terminationFuture, version, maximumFramesize, flinkClassLoader); + super( + rpcEndpoint, + terminationFuture, + version, + maximumFramesize, + forceSerialization, + flinkClassLoader); } @Override
