This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ce71fb5e7e8b7a5421cd4bd15264273facd9986d
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Feb 22 12:21:31 2019 +0100

    [hotfix] Factor logic out of AkkaRpcActor#handleRpcInvocation to reduce 
size of method
---
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       | 82 ++++++++++++----------
 1 file changed, 45 insertions(+), 37 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 6dd9a22..b434001 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -281,45 +281,13 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
                                                return;
                                        }
 
-                                       final boolean isRemoteSender = 
isRemoteSender();
                                        final String methodName = 
rpcMethod.getName();
 
                                        if (result instanceof 
CompletableFuture) {
-                                               final CompletableFuture<?> 
future = (CompletableFuture<?>) result;
-                                               Promise.DefaultPromise<Object> 
promise = new Promise.DefaultPromise<>();
-
-                                               future.whenComplete(
-                                                       (value, throwable) -> {
-                                                               if (throwable 
!= null) {
-                                                                       
promise.failure(throwable);
-                                                               } else {
-                                                                       if 
(isRemoteSender) {
-                                                                               
Either<SerializedValue<?>, AkkaRpcException> serializedResult = 
serializeRemoteResultAndVerifySize(value, methodName);
-
-                                                                               
if (serializedResult.isLeft()) {
-                                                                               
        promise.success(serializedResult.left());
-                                                                               
} else {
-                                                                               
        promise.failure(serializedResult.right());
-                                                                               
}
-                                                                       } else {
-                                                                               
promise.success(value);
-                                                                       }
-                                                               }
-                                                       });
-
-                                               Patterns.pipe(promise.future(), 
getContext().dispatcher()).to(getSender());
+                                               final CompletableFuture<?> 
responseFuture = (CompletableFuture<?>) result;
+                                               
sendAsyncResponse(responseFuture, methodName);
                                        } else {
-                                               if (isRemoteSender) {
-                                                       
Either<SerializedValue<?>, AkkaRpcException> serializedResult = 
serializeRemoteResultAndVerifySize(result, methodName);
-
-                                                       if 
(serializedResult.isLeft()) {
-                                                               
getSender().tell(new Status.Success(serializedResult.left()), getSelf());
-                                                       } else {
-                                                               
getSender().tell(new Status.Failure(serializedResult.right()), getSelf());
-                                                       }
-                                               } else {
-                                                       getSender().tell(new 
Status.Success(result), getSelf());
-                                               }
+                                               sendSyncResponse(result, 
methodName);
                                        }
                                }
                        } catch (Throwable e) {
@@ -330,8 +298,48 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
                }
        }
 
-       private boolean isRemoteSender() {
-               return !getSender().path().address().hasLocalScope();
+       private void sendSyncResponse(Object response, String methodName) {
+               if (isRemoteSender(getSender())) {
+                       Either<SerializedValue<?>, AkkaRpcException> 
serializedResult = serializeRemoteResultAndVerifySize(response, methodName);
+
+                       if (serializedResult.isLeft()) {
+                               getSender().tell(new 
Status.Success(serializedResult.left()), getSelf());
+                       } else {
+                               getSender().tell(new 
Status.Failure(serializedResult.right()), getSelf());
+                       }
+               } else {
+                       getSender().tell(new Status.Success(response), 
getSelf());
+               }
+       }
+
+       private void sendAsyncResponse(CompletableFuture<?> asyncResponse, 
String methodName) {
+               final ActorRef sender = getSender();
+               Promise.DefaultPromise<Object> promise = new 
Promise.DefaultPromise<>();
+
+               asyncResponse.whenComplete(
+                       (value, throwable) -> {
+                               if (throwable != null) {
+                                       promise.failure(throwable);
+                               } else {
+                                       if (isRemoteSender(sender)) {
+                                               Either<SerializedValue<?>, 
AkkaRpcException> serializedResult = serializeRemoteResultAndVerifySize(value, 
methodName);
+
+                                               if (serializedResult.isLeft()) {
+                                                       
promise.success(serializedResult.left());
+                                               } else {
+                                                       
promise.failure(serializedResult.right());
+                                               }
+                                       } else {
+                                               promise.success(value);
+                                       }
+                               }
+                       });
+
+               Patterns.pipe(promise.future(), 
getContext().dispatcher()).to(sender);
+       }
+
+       private boolean isRemoteSender(ActorRef sender) {
+               return !sender.path().address().hasLocalScope();
        }
 
        private Either<SerializedValue<?>, AkkaRpcException> 
serializeRemoteResultAndVerifySize(Object result, String methodName) {

Reply via email to