[FLINK-5239] [distributed coordination] RPC service properly unpacks 'InvocationTargetExceptions'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/887cbb90 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/887cbb90 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/887cbb90 Branch: refs/heads/flip-6 Commit: 887cbb9095af92e4788c06ba0307cc9db5c5b948 Parents: 44fc46d Author: Stephan Ewen <[email protected]> Authored: Fri Dec 2 18:49:21 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Dec 5 02:49:43 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 16 +++- .../runtime/rpc/akka/AkkaRpcActorTest.java | 89 +++++++++++++++++++- 2 files changed, 102 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/887cbb90/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index fe6b23b..264ba96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -180,8 +181,19 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp if (rpcMethod.getReturnType().equals(Void.TYPE)) { // No return value to send back rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); - } else { - Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); + } + else { + final Object result; + try { + result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); + } + catch (InvocationTargetException e) { + LOG.trace("Reporting back error thrown in remote procedure {}", rpcMethod, e); + + // tell the sender about the failure + getSender().tell(new Status.Failure(e.getTargetException()), getSelf()); + return; + } if (result instanceof Future) { final Future<?> future = (Future<?>) result; http://git-wip-us.apache.org/repos/asf/flink/blob/887cbb90/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 760e1a7..c73240c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -22,6 +22,7 @@ import akka.actor.ActorSystem; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; @@ -30,6 +31,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.util.TestLogger; + import org.hamcrest.core.Is; import org.junit.AfterClass; import org.junit.Test; @@ -86,7 +88,7 @@ public class AkkaRpcActorTest extends TestLogger { Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class); try { - DummyRpcGateway gateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit()); + futureRpcGateway.get(timeout.getSize(), timeout.getUnit()); fail("The rpc connection resolution should have failed."); } catch (ExecutionException exception) { @@ -192,6 +194,48 @@ public class AkkaRpcActorTest extends TestLogger { terminationFuture.get(); } + @Test + public void testExceptionPropagation() throws Exception { + ExceptionalEndpoint rpcEndpoint = new ExceptionalEndpoint(akkaRpcService); + rpcEndpoint.start(); + + ExceptionalGateway rpcGateway = rpcEndpoint.getSelf(); + Future<Integer> result = rpcGateway.doStuff(); + + try { + result.get(timeout.getSize(), timeout.getUnit()); + fail("this should fail with an exception"); + } + catch (ExecutionException e) { + Throwable cause = e.getCause(); + assertEquals(RuntimeException.class, cause.getClass()); + assertEquals("my super specific test exception", cause.getMessage()); + } + } + + @Test + public void testExceptionPropagationFuturePiping() throws Exception { + ExceptionalFutureEndpoint rpcEndpoint = new ExceptionalFutureEndpoint(akkaRpcService); + rpcEndpoint.start(); + + ExceptionalGateway rpcGateway = rpcEndpoint.getSelf(); + Future<Integer> result = rpcGateway.doStuff(); + + try { + result.get(timeout.getSize(), timeout.getUnit()); + fail("this should fail with an exception"); + } + catch (ExecutionException e) { + Throwable cause = e.getCause(); + assertEquals(Exception.class, cause.getClass()); + assertEquals("some test", cause.getMessage()); + } + } + + // ------------------------------------------------------------------------ + // Test Actors and Interfaces + // ------------------------------------------------------------------------ + private interface DummyRpcGateway extends RpcGateway { Future<Integer> foobar(); } @@ -218,4 +262,47 @@ public class AkkaRpcActorTest extends TestLogger { _foobar = value; } } + + // ------------------------------------------------------------------------ + + private interface ExceptionalGateway extends RpcGateway { + Future<Integer> doStuff(); + } + + private static class ExceptionalEndpoint extends RpcEndpoint<ExceptionalGateway> { + + protected ExceptionalEndpoint(RpcService rpcService) { + super(rpcService); + } + + @RpcMethod + public int doStuff() { + throw new RuntimeException("my super specific test exception"); + } + } + + private static class ExceptionalFutureEndpoint extends RpcEndpoint<ExceptionalGateway> { + + protected ExceptionalFutureEndpoint(RpcService rpcService) { + super(rpcService); + } + + @RpcMethod + public Future<Integer> doStuff() { + final FlinkCompletableFuture<Integer> future = new FlinkCompletableFuture<>(); + + // complete the future slightly in the, well, future... + new Thread() { + @Override + public void run() { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) {} + future.completeExceptionally(new Exception("some test")); + } + }.start(); + + return future; + } + } }
