This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e932aad8ff5ac953f1e73193dda22da9f242851e Author: Chesnay Schepler <[email protected]> AuthorDate: Fri Oct 29 15:34:53 2021 +0200 [FLINK-24706][rpc] Forward deserialization errors to returned future --- .../runtime/rpc/akka/AkkaInvocationHandler.java | 13 ++-- .../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 76 ++++++++++++++++++++++ 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index b4fd6ab..db73771 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -237,7 +237,10 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc final Throwable callStackCapture = captureAskCallStack ? new Throwable() : null; // execute an asynchronous call - final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout); + final CompletableFuture<?> resultFuture = + ask(rpcInvocation, futureTimeout) + .thenApply( + resultValue -> deserializeValueIfNeeded(resultValue, method)); final CompletableFuture<Object> completableFuture = new CompletableFuture<>(); resultFuture.whenComplete( @@ -245,10 +248,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc if (failure != null) { completableFuture.completeExceptionally( resolveTimeoutException( - failure, callStackCapture, address, rpcInvocation)); + ExceptionUtils.stripCompletionException(failure), + callStackCapture, + address, + rpcInvocation)); } else { - completableFuture.complete( - deserializeValueIfNeeded(resultValue, method)); + completableFuture.complete(resultValue); } }); diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index e99b571..29f973e 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException; import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; +import org.apache.flink.runtime.rpc.exceptions.RpcException; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; @@ -53,6 +54,8 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; import java.io.UncheckedIOException; import java.time.Duration; import java.util.concurrent.CompletableFuture; @@ -237,6 +240,53 @@ public class AkkaRpcActorTest extends TestLogger { } } + /** + * Tests that the AkkaInvocationHandler properly fails the returned future if the response + * cannot be deserialized. + */ + @Test + public void testResultFutureFailsOnDeserializationError() throws Exception { + // setup 2 actor systems and rpc services that support remote connections (for which RPCs go + // through serialization) + final AkkaRpcService serverAkkaRpcService = + new AkkaRpcService( + AkkaUtils.createActorSystem( + "serverActorSystem", + AkkaUtils.getAkkaConfig( + new Configuration(), new HostAndPort("localhost", 0))), + AkkaRpcServiceConfiguration.defaultConfiguration()); + + final AkkaRpcService clientAkkaRpcService = + new AkkaRpcService( + AkkaUtils.createActorSystem( + "clientActorSystem", + AkkaUtils.getAkkaConfig( + new Configuration(), new HostAndPort("localhost", 0))), + AkkaRpcServiceConfiguration.defaultConfiguration()); + + try { + final DeserializatonFailingEndpoint rpcEndpoint = + new DeserializatonFailingEndpoint(serverAkkaRpcService); + rpcEndpoint.start(); + + final DeserializatonFailingGateway rpcGateway = + rpcEndpoint.getSelfGateway(DeserializatonFailingGateway.class); + + final DeserializatonFailingGateway connect = + clientAkkaRpcService + .connect(rpcGateway.getAddress(), DeserializatonFailingGateway.class) + .get(); + + assertThat( + connect.doStuff(), + FlinkMatchers.futureWillCompleteExceptionally( + RpcException.class, Duration.ofHours(1))); + } finally { + RpcUtils.terminateRpcService(clientAkkaRpcService, timeout); + RpcUtils.terminateRpcService(serverAkkaRpcService, timeout); + } + } + /** Tests that exception thrown in the onStop method are returned by the termination future. */ @Test public void testOnStopExceptionPropagation() throws Exception { @@ -748,6 +798,32 @@ public class AkkaRpcActorTest extends TestLogger { // ------------------------------------------------------------------------ + private interface DeserializatonFailingGateway extends RpcGateway { + CompletableFuture<DeserializationFailingObject> doStuff(); + } + + private static class DeserializatonFailingEndpoint extends RpcEndpoint + implements DeserializatonFailingGateway { + + protected DeserializatonFailingEndpoint(RpcService rpcService) { + super(rpcService); + } + + @Override + public CompletableFuture<DeserializationFailingObject> doStuff() { + return CompletableFuture.completedFuture(new DeserializationFailingObject()); + } + } + + private static class DeserializationFailingObject implements Serializable { + private void readObject(ObjectInputStream aInputStream) + throws ClassNotFoundException, IOException { + throw new ClassNotFoundException("test exception"); + } + } + + // ------------------------------------------------------------------------ + private static class SimpleRpcEndpoint extends RpcEndpoint implements RpcGateway { protected SimpleRpcEndpoint(RpcService rpcService, String endpointId) {
