[FLINK-5239] [distributed coordination] RPC service properly unpacks 
'InvocationTargetExceptions'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/887cbb90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/887cbb90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/887cbb90

Branch: refs/heads/flip-6
Commit: 887cbb9095af92e4788c06ba0307cc9db5c5b948
Parents: 44fc46d
Author: Stephan Ewen <[email protected]>
Authored: Fri Dec 2 18:49:21 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 16 +++-
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 89 +++++++++++++++++++-
 2 files changed, 102 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/887cbb90/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index fe6b23b..264ba96 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
@@ -180,8 +181,19 @@ class AkkaRpcActor<C extends RpcGateway, T extends 
RpcEndpoint<C>> extends Untyp
                                if 
(rpcMethod.getReturnType().equals(Void.TYPE)) {
                                        // No return value to send back
                                        rpcMethod.invoke(rpcEndpoint, 
rpcInvocation.getArgs());
-                               } else {
-                                       Object result = 
rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+                               }
+                               else {
+                                       final Object result;
+                                       try {
+                                               result = 
rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+                                       }
+                                       catch (InvocationTargetException e) {
+                                               LOG.trace("Reporting back error 
thrown in remote procedure {}", rpcMethod, e);
+
+                                               // tell the sender about the 
failure
+                                               getSender().tell(new 
Status.Failure(e.getTargetException()), getSelf());
+                                               return;
+                                       }
 
                                        if (result instanceof Future) {
                                                final Future<?> future = 
(Future<?>) result;

http://git-wip-us.apache.org/repos/asf/flink/blob/887cbb90/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 760e1a7..c73240c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -22,6 +22,7 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.TestLogger;
+
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -86,7 +88,7 @@ public class AkkaRpcActorTest extends TestLogger {
                Future<DummyRpcGateway> futureRpcGateway = 
akkaRpcService.connect("foobar", DummyRpcGateway.class);
 
                try {
-                       DummyRpcGateway gateway = 
futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
+                       futureRpcGateway.get(timeout.getSize(), 
timeout.getUnit());
 
                        fail("The rpc connection resolution should have 
failed.");
                } catch (ExecutionException exception) {
@@ -192,6 +194,48 @@ public class AkkaRpcActorTest extends TestLogger {
                terminationFuture.get();
        }
 
+       @Test
+       public void testExceptionPropagation() throws Exception {
+               ExceptionalEndpoint rpcEndpoint = new 
ExceptionalEndpoint(akkaRpcService);
+               rpcEndpoint.start();
+
+               ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
+               Future<Integer> result = rpcGateway.doStuff();
+
+               try {
+                       result.get(timeout.getSize(), timeout.getUnit());
+                       fail("this should fail with an exception");
+               }
+               catch (ExecutionException e) {
+                       Throwable cause = e.getCause();
+                       assertEquals(RuntimeException.class, cause.getClass());
+                       assertEquals("my super specific test exception", 
cause.getMessage());
+               }
+       }
+
+       @Test
+       public void testExceptionPropagationFuturePiping() throws Exception {
+               ExceptionalFutureEndpoint rpcEndpoint = new 
ExceptionalFutureEndpoint(akkaRpcService);
+               rpcEndpoint.start();
+
+               ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
+               Future<Integer> result = rpcGateway.doStuff();
+
+               try {
+                       result.get(timeout.getSize(), timeout.getUnit());
+                       fail("this should fail with an exception");
+               }
+               catch (ExecutionException e) {
+                       Throwable cause = e.getCause();
+                       assertEquals(Exception.class, cause.getClass());
+                       assertEquals("some test", cause.getMessage());
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Test Actors and Interfaces
+       // 
------------------------------------------------------------------------
+
        private interface DummyRpcGateway extends RpcGateway {
                Future<Integer> foobar();
        }
@@ -218,4 +262,47 @@ public class AkkaRpcActorTest extends TestLogger {
                        _foobar = value;
                }
        }
+
+       // 
------------------------------------------------------------------------
+
+       private interface ExceptionalGateway extends RpcGateway {
+               Future<Integer> doStuff();
+       }
+
+       private static class ExceptionalEndpoint extends 
RpcEndpoint<ExceptionalGateway> {
+
+               protected ExceptionalEndpoint(RpcService rpcService) {
+                       super(rpcService);
+               }
+
+               @RpcMethod
+               public int doStuff() {
+                       throw new RuntimeException("my super specific test 
exception");
+               }
+       }
+
+       private static class ExceptionalFutureEndpoint extends 
RpcEndpoint<ExceptionalGateway> {
+
+               protected ExceptionalFutureEndpoint(RpcService rpcService) {
+                       super(rpcService);
+               }
+
+               @RpcMethod
+               public Future<Integer> doStuff() {
+                       final FlinkCompletableFuture<Integer> future = new 
FlinkCompletableFuture<>();
+
+                       // complete the future slightly in the, well, future...
+                       new Thread() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               Thread.sleep(10);
+                                       } catch (InterruptedException ignored) 
{}
+                                       future.completeExceptionally(new 
Exception("some test"));
+                               }
+                       }.start();
+
+                       return future;
+               }
+       }
 }

Reply via email to