Repository: aries-rsa Updated Branches: refs/heads/master 1a9c55bbd -> 1c5922ce7
[ARIES-1757] Add support for Future as return value Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/1c5922ce Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/1c5922ce Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/1c5922ce Branch: refs/heads/master Commit: 1c5922ce7807adb53e5dbaa282fc744c03048ca0 Parents: 1a9c55b Author: Christian Schneider <[email protected]> Authored: Wed Nov 22 17:57:55 2017 +0100 Committer: Christian Schneider <[email protected]> Committed: Wed Nov 22 17:57:55 2017 +0100 ---------------------------------------------------------------------- .../aries/rsa/provider/tcp/TCPServer.java | 27 +++++++++---- .../rsa/provider/tcp/TcpInvocationHandler.java | 41 +++++++++++++++----- .../aries/rsa/provider/tcp/TcpProviderTest.java | 24 ++++++++---- .../rsa/provider/tcp/myservice/MyService.java | 3 ++ .../provider/tcp/myservice/MyServiceImpl.java | 19 ++++++++- 5 files changed, 89 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1c5922ce/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java index 1daf546..85a7c31 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java @@ -22,11 +22,13 @@ import java.io.Closeable; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.InvocationTargetException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -63,14 +65,11 @@ public class TCPServer implements Closeable, Runnable { ClassLoader serviceCL = service.getClass().getClassLoader(); while (running) { try ( - Socket socket = this.serverSocket.accept(); - ObjectInputStream ois = new LoaderObjectInputStream(socket.getInputStream(), serviceCL); - ObjectOutputStream objectOutput = new ObjectOutputStream(socket.getOutputStream()) + Socket socket = this.serverSocket.accept(); + ObjectInputStream ois = new LoaderObjectInputStream(socket.getInputStream(), serviceCL); + ObjectOutputStream objectOutput = new ObjectOutputStream(socket.getOutputStream()) ) { - String methodName = (String)ois.readObject(); - Object[] args = (Object[])ois.readObject(); - Object result = invoker.invoke(methodName, args); - objectOutput.writeObject(result); + handleCall(ois, objectOutput); } catch (SocketException e) { running = false; } catch (Exception e) { @@ -79,7 +78,19 @@ public class TCPServer implements Closeable, Runnable { } } - + @SuppressWarnings("unchecked") + private void handleCall(ObjectInputStream ois, ObjectOutputStream objectOutput) throws Exception { + String methodName = (String)ois.readObject(); + Object[] args = (Object[])ois.readObject(); + Object result = invoker.invoke(methodName, args); + if (result instanceof InvocationTargetException) { + result = ((InvocationTargetException) result).getCause(); + } else if (result instanceof Future) { + Future<Object> fu = (Future<Object>) result; + result = fu.get(); + } + objectOutput.writeObject(result); + } @Override public void close() throws IOException { http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1c5922ce/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java index 3206339..8dea17f 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java @@ -25,6 +25,9 @@ import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.net.Socket; import java.net.UnknownHostException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.Supplier; public class TcpInvocationHandler implements InvocationHandler { private String host; @@ -42,6 +45,27 @@ public class TcpInvocationHandler implements InvocationHandler { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (Future.class.isAssignableFrom(method.getReturnType())) { + return createAsyncResult(method, args); + } else { + return handleSyncCall(method, args); + } + } + + private Object createAsyncResult(final Method method, final Object[] args) { + return CompletableFuture.supplyAsync(new Supplier<Object>() { + public Object get() { + try { + return handleSyncCall(method, args); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + }); + } + + private Object handleSyncCall(Method method, Object[] args) throws Throwable { + Object result; try ( Socket socket = new Socket(this.host, this.port); ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()) @@ -50,20 +74,19 @@ public class TcpInvocationHandler implements InvocationHandler { out.writeObject(method.getName()); out.writeObject(args); out.flush(); - return parseResult(socket); - } catch (Exception e) { + result = parseResult(socket); + } catch (Throwable e) { throw new RuntimeException("Error calling " + host + ":" + port + " method: " + method.getName(), e); } + if (result instanceof Throwable) { + throw (Throwable)result; + } + return result; } - private Object parseResult(Socket socket) throws IOException, ClassNotFoundException, Throwable { + private Object parseResult(Socket socket) throws Throwable { try (ObjectInputStream in = new LoaderObjectInputStream(socket.getInputStream(), cl)) { - Object result = in.readObject(); - if (result instanceof Throwable) { - throw (Throwable)result; - } else { - return result; - } + return in.readObject(); } } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1c5922ce/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java index c9d69d2..fab0129 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java @@ -19,6 +19,7 @@ package org.apache.aries.rsa.provider.tcp; import static org.hamcrest.core.StringStartsWith.startsWith; +import static org.junit.Assert.assertEquals; import java.io.IOException; import java.net.SocketTimeoutException; @@ -28,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.aries.rsa.provider.tcp.myservice.MyService; @@ -43,6 +45,7 @@ import org.osgi.framework.BundleContext; public class TcpProviderTest { + private static final int TIMEOUT = 200; private static final int NUM_CALLS = 100; private static MyService myServiceProxy; private static Endpoint ep; @@ -55,7 +58,7 @@ public class TcpProviderTest { EndpointHelper.addObjectClass(props, exportedInterfaces); props.put("aries.rsa.hostname", "localhost"); props.put("aries.rsa.numThreads", "10"); - props.put("osgi.basic.timeout", 100); + props.put("osgi.basic.timeout", TIMEOUT); MyService myService = new MyServiceImpl(); BundleContext bc = EasyMock.mock(BundleContext.class); ep = provider.exportService(myService, bc, props, exportedInterfaces); @@ -68,7 +71,7 @@ public class TcpProviderTest { } @Test - public void testCallTimeout() throws IOException { + public void testCallTimeout() { try { myServiceProxy.call("slow"); Assert.fail("Expecting timeout"); @@ -78,25 +81,25 @@ public class TcpProviderTest { } @Test - public void testPerf() throws IOException, InterruptedException { + public void testPerf() throws InterruptedException { runPerfTest(myServiceProxy); String msg = "test"; String result = myServiceProxy.echo(msg); Assert.assertEquals(msg, result); } - @Test(expected=RuntimeException.class) - public void testCallException() throws IOException, InterruptedException { + @Test(expected=IllegalArgumentException.class) + public void testCallException() { myServiceProxy.call("throw exception"); } @Test - public void testCall() throws IOException, InterruptedException { + public void testCall() { myServiceProxy.echo("test"); } @Test - public void testCallOneway() throws IOException, InterruptedException { + public void testCallOneway() { myServiceProxy.callOneWay("test"); } @@ -109,6 +112,13 @@ public class TcpProviderTest { myServiceProxy.callWithList(msgList); } + @Test + public void testAsyncCall() throws Exception { + Future<String> result = myServiceProxy.callAsync(100); + String answer = result.get(1, TimeUnit.SECONDS); + assertEquals("Finished", answer); + } + @AfterClass public static void close() throws IOException { ep.close(); http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1c5922ce/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java index 6a791c9..da3b10c 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java @@ -19,6 +19,7 @@ package org.apache.aries.rsa.provider.tcp.myservice; import java.util.List; +import java.util.concurrent.Future; import javax.jws.Oneway; @@ -32,5 +33,7 @@ public interface MyService { void callOneWay(String msg); void callWithList(List<String> msg); + + Future<String> callAsync(int delay); } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1c5922ce/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java index a39e155..0e4d88a 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java @@ -18,7 +18,11 @@ */ package org.apache.aries.rsa.provider.tcp.myservice; +import static java.util.concurrent.CompletableFuture.supplyAsync; + import java.util.List; +import java.util.concurrent.Future; +import java.util.function.Supplier; public class MyServiceImpl implements MyService { @@ -34,7 +38,7 @@ public class MyServiceImpl implements MyService { } if ("slow".equals(msg)) { try { - Thread.sleep(200); + Thread.sleep(300); } catch (InterruptedException e) { } } @@ -49,5 +53,18 @@ public class MyServiceImpl implements MyService { } + @Override + public Future<String> callAsync(final int delay) { + return supplyAsync(new Supplier<String>() { + public String get() { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + } + return "Finished"; + } + + }); + } }
