This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f2ef60c4fd5470995c4acd6e6f96c6cd6bde84c3
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Jun 7 10:51:08 2022 +0200

    [FLINK-27933][coordination][tests] Force serialization applies to return 
value
---
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       | 22 ++++++++++++++++------
 .../flink/runtime/rpc/akka/AkkaRpcService.java     |  1 +
 .../flink/runtime/rpc/akka/FencedAkkaRpcActor.java |  9 ++++++++-
 3 files changed, 25 insertions(+), 7 deletions(-)

diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 9990033c2c1..8029956bbd9 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
+import org.apache.flink.runtime.rpc.Local;
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -100,6 +101,8 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
 
     private final AtomicBoolean rpcEndpointStopped;
 
+    private final boolean forceSerialization;
+
     private volatile RpcEndpointTerminationResult rpcEndpointTerminationResult;
 
     @Nonnull private State state;
@@ -109,11 +112,13 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
             final CompletableFuture<Boolean> terminationFuture,
             final int version,
             final long maximumFramesize,
+            final boolean forceSerialization,
             final ClassLoader flinkClassLoader) {
 
         checkArgument(maximumFramesize > 0, "Maximum framesize must be 
positive.");
         this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
         this.flinkClassLoader = checkNotNull(flinkClassLoader);
+        this.forceSerialization = forceSerialization;
         this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
         this.terminationFuture = checkNotNull(terminationFuture);
         this.version = version;
@@ -314,12 +319,14 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
                     }
 
                     final String methodName = rpcMethod.getName();
+                    final boolean isLocalRpcInvocation =
+                            rpcMethod.getAnnotation(Local.class) != null;
 
                     if (result instanceof CompletableFuture) {
                         final CompletableFuture<?> responseFuture = 
(CompletableFuture<?>) result;
-                        sendAsyncResponse(responseFuture, methodName);
+                        sendAsyncResponse(responseFuture, methodName, 
isLocalRpcInvocation);
                     } else {
-                        sendSyncResponse(result, methodName);
+                        sendSyncResponse(result, methodName, 
isLocalRpcInvocation);
                     }
                 }
             } catch (Throwable e) {
@@ -330,8 +337,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
         }
     }
 
-    private void sendSyncResponse(Object response, String methodName) {
-        if (isRemoteSender(getSender())) {
+    private void sendSyncResponse(
+            Object response, String methodName, boolean isLocalRpcInvocation) {
+        if (isRemoteSender(getSender()) || (forceSerialization && 
!isLocalRpcInvocation)) {
             Either<AkkaRpcSerializedValue, AkkaRpcException> serializedResult =
                     serializeRemoteResultAndVerifySize(response, methodName);
 
@@ -345,7 +353,8 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
         }
     }
 
-    private void sendAsyncResponse(CompletableFuture<?> asyncResponse, String 
methodName) {
+    private void sendAsyncResponse(
+            CompletableFuture<?> asyncResponse, String methodName, boolean 
isLocalRpcInvocation) {
         final ActorRef sender = getSender();
         Promise.DefaultPromise<Object> promise = new 
Promise.DefaultPromise<>();
 
@@ -355,7 +364,8 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends AbstractActor {
                             if (throwable != null) {
                                 promise.failure(throwable);
                             } else {
-                                if (isRemoteSender(sender)) {
+                                if (isRemoteSender(sender)
+                                        || (forceSerialization && 
!isLocalRpcInvocation)) {
                                     Either<AkkaRpcSerializedValue, 
AkkaRpcException>
                                             serializedResult =
                                                     
serializeRemoteResultAndVerifySize(
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 91a438242fe..e5a20c3dc07 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -349,6 +349,7 @@ public class AkkaRpcService implements RpcService {
                                             actorTerminationFuture,
                                             getVersion(),
                                             
configuration.getMaximumFramesize(),
+                                            
configuration.isForceRpcInvocationSerialization(),
                                             flinkClassLoader),
                             rpcEndpoint.getEndpointId());
 
diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index af901b02542..8e034e696b4 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -45,8 +45,15 @@ public class FencedAkkaRpcActor<F extends Serializable, T 
extends FencedRpcEndpo
             CompletableFuture<Boolean> terminationFuture,
             int version,
             final long maximumFramesize,
+            final boolean forceSerialization,
             ClassLoader flinkClassLoader) {
-        super(rpcEndpoint, terminationFuture, version, maximumFramesize, 
flinkClassLoader);
+        super(
+                rpcEndpoint,
+                terminationFuture,
+                version,
+                maximumFramesize,
+                forceSerialization,
+                flinkClassLoader);
     }
 
     @Override

Reply via email to