This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit fe7b11a9d9c9a74d136efa0adb58a0a0bf152f1b Author: Till Rohrmann <[email protected]> AuthorDate: Fri Feb 22 12:21:31 2019 +0100 [hotfix] Factor logic out of AkkaRpcActor#handleRpcInvocation to reduce size of method --- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 82 ++++++++++++---------- 1 file changed, 45 insertions(+), 37 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 6dd9a22..b434001 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -281,45 +281,13 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { return; } - final boolean isRemoteSender = isRemoteSender(); final String methodName = rpcMethod.getName(); if (result instanceof CompletableFuture) { - final CompletableFuture<?> future = (CompletableFuture<?>) result; - Promise.DefaultPromise<Object> promise = new Promise.DefaultPromise<>(); - - future.whenComplete( - (value, throwable) -> { - if (throwable != null) { - promise.failure(throwable); - } else { - if (isRemoteSender) { - Either<SerializedValue<?>, AkkaRpcException> serializedResult = serializeRemoteResultAndVerifySize(value, methodName); - - if (serializedResult.isLeft()) { - promise.success(serializedResult.left()); - } else { - promise.failure(serializedResult.right()); - } - } else { - promise.success(value); - } - } - }); - - Patterns.pipe(promise.future(), getContext().dispatcher()).to(getSender()); + final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result; + sendAsyncResponse(responseFuture, methodName); } else { - if (isRemoteSender) { - Either<SerializedValue<?>, AkkaRpcException> serializedResult = serializeRemoteResultAndVerifySize(result, methodName); - - if (serializedResult.isLeft()) { - getSender().tell(new Status.Success(serializedResult.left()), getSelf()); - } else { - getSender().tell(new Status.Failure(serializedResult.right()), getSelf()); - } - } else { - getSender().tell(new Status.Success(result), getSelf()); - } + sendSyncResponse(result, methodName); } } } catch (Throwable e) { @@ -330,8 +298,48 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { } } - private boolean isRemoteSender() { - return !getSender().path().address().hasLocalScope(); + private void sendSyncResponse(Object response, String methodName) { + if (isRemoteSender(getSender())) { + Either<SerializedValue<?>, AkkaRpcException> serializedResult = serializeRemoteResultAndVerifySize(response, methodName); + + if (serializedResult.isLeft()) { + getSender().tell(new Status.Success(serializedResult.left()), getSelf()); + } else { + getSender().tell(new Status.Failure(serializedResult.right()), getSelf()); + } + } else { + getSender().tell(new Status.Success(response), getSelf()); + } + } + + private void sendAsyncResponse(CompletableFuture<?> asyncResponse, String methodName) { + final ActorRef sender = getSender(); + Promise.DefaultPromise<Object> promise = new Promise.DefaultPromise<>(); + + asyncResponse.whenComplete( + (value, throwable) -> { + if (throwable != null) { + promise.failure(throwable); + } else { + if (isRemoteSender(sender)) { + Either<SerializedValue<?>, AkkaRpcException> serializedResult = serializeRemoteResultAndVerifySize(value, methodName); + + if (serializedResult.isLeft()) { + promise.success(serializedResult.left()); + } else { + promise.failure(serializedResult.right()); + } + } else { + promise.success(value); + } + } + }); + + Patterns.pipe(promise.future(), getContext().dispatcher()).to(sender); + } + + private boolean isRemoteSender(ActorRef sender) { + return !sender.path().address().hasLocalScope(); } private Either<SerializedValue<?>, AkkaRpcException> serializeRemoteResultAndVerifySize(Object result, String methodName) {
