Repository: aries-rsa
Updated Branches:
  refs/heads/master 1a9c55bbd -> 1c5922ce7


[ARIES-1757] Add support for Future as return value


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/1c5922ce
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/1c5922ce
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/1c5922ce

Branch: refs/heads/master
Commit: 1c5922ce7807adb53e5dbaa282fc744c03048ca0
Parents: 1a9c55b
Author: Christian Schneider <[email protected]>
Authored: Wed Nov 22 17:57:55 2017 +0100
Committer: Christian Schneider <[email protected]>
Committed: Wed Nov 22 17:57:55 2017 +0100

----------------------------------------------------------------------
 .../aries/rsa/provider/tcp/TCPServer.java       | 27 +++++++++----
 .../rsa/provider/tcp/TcpInvocationHandler.java  | 41 +++++++++++++++-----
 .../aries/rsa/provider/tcp/TcpProviderTest.java | 24 ++++++++----
 .../rsa/provider/tcp/myservice/MyService.java   |  3 ++
 .../provider/tcp/myservice/MyServiceImpl.java   | 19 ++++++++-
 5 files changed, 89 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1c5922ce/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
----------------------------------------------------------------------
diff --git 
a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java 
b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
index 1daf546..85a7c31 100644
--- 
a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
+++ 
b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
@@ -22,11 +22,13 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.InvocationTargetException;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -63,14 +65,11 @@ public class TCPServer implements Closeable, Runnable {
         ClassLoader serviceCL = service.getClass().getClassLoader();
         while (running) {
             try (
-                Socket socket = this.serverSocket.accept();
-                ObjectInputStream ois = new 
LoaderObjectInputStream(socket.getInputStream(), serviceCL);
-                ObjectOutputStream objectOutput = new 
ObjectOutputStream(socket.getOutputStream())
+                    Socket socket = this.serverSocket.accept();
+                    ObjectInputStream ois = new 
LoaderObjectInputStream(socket.getInputStream(), serviceCL);
+                    ObjectOutputStream objectOutput = new 
ObjectOutputStream(socket.getOutputStream())
                 ) {
-                String methodName = (String)ois.readObject();
-                Object[] args = (Object[])ois.readObject();
-                Object result = invoker.invoke(methodName, args);
-                objectOutput.writeObject(result);
+                handleCall(ois, objectOutput);
             } catch (SocketException e) {
                 running = false;
             } catch (Exception e) {
@@ -79,7 +78,19 @@ public class TCPServer implements Closeable, Runnable {
         }
     }
 
-
+    @SuppressWarnings("unchecked")
+    private void handleCall(ObjectInputStream ois, ObjectOutputStream 
objectOutput) throws Exception {
+        String methodName = (String)ois.readObject();
+        Object[] args = (Object[])ois.readObject();
+        Object result = invoker.invoke(methodName, args);
+        if (result instanceof InvocationTargetException) {
+            result = ((InvocationTargetException) result).getCause();
+        } else if (result instanceof Future) {
+            Future<Object> fu = (Future<Object>) result;
+            result = fu.get();
+        }
+        objectOutput.writeObject(result);
+    }
 
     @Override
     public void close() throws IOException {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1c5922ce/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
 
b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
index 3206339..8dea17f 100644
--- 
a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
+++ 
b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
@@ -25,6 +25,9 @@ import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.net.Socket;
 import java.net.UnknownHostException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
 
 public class TcpInvocationHandler implements InvocationHandler {
     private String host;
@@ -42,6 +45,27 @@ public class TcpInvocationHandler implements 
InvocationHandler {
 
     @Override
     public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+        if (Future.class.isAssignableFrom(method.getReturnType())) {
+            return createAsyncResult(method, args);
+        } else {
+            return handleSyncCall(method, args);
+        }
+    }
+
+    private Object createAsyncResult(final Method method, final Object[] args) 
{
+        return CompletableFuture.supplyAsync(new Supplier<Object>() {
+            public Object get() {
+                try {
+                    return handleSyncCall(method, args);
+                } catch (Throwable e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+    }
+
+    private Object handleSyncCall(Method method, Object[] args) throws 
Throwable {
+        Object result;
         try (
                 Socket socket = new Socket(this.host, this.port);
                 ObjectOutputStream out = new 
ObjectOutputStream(socket.getOutputStream())
@@ -50,20 +74,19 @@ public class TcpInvocationHandler implements 
InvocationHandler {
             out.writeObject(method.getName());
             out.writeObject(args);
             out.flush();
-            return parseResult(socket);
-        } catch (Exception e) {
+            result = parseResult(socket);
+        } catch (Throwable e) {
             throw new RuntimeException("Error calling " + host + ":" + port + 
" method: " + method.getName(), e);
         }
+        if (result instanceof Throwable) {
+            throw (Throwable)result;
+        }
+        return result;
     }
 
-    private Object parseResult(Socket socket) throws IOException, 
ClassNotFoundException, Throwable {
+    private Object parseResult(Socket socket) throws Throwable {
         try (ObjectInputStream in = new 
LoaderObjectInputStream(socket.getInputStream(), cl)) {
-            Object result = in.readObject();
-            if (result instanceof Throwable) {
-                throw (Throwable)result;
-            } else {
-                return result;
-            }
+            return in.readObject();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1c5922ce/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
----------------------------------------------------------------------
diff --git 
a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
 
b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
index c9d69d2..fab0129 100644
--- 
a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
+++ 
b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
@@ -19,6 +19,7 @@
 package org.apache.aries.rsa.provider.tcp;
 
 import static org.hamcrest.core.StringStartsWith.startsWith;
+import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.net.SocketTimeoutException;
@@ -28,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.aries.rsa.provider.tcp.myservice.MyService;
@@ -43,6 +45,7 @@ import org.osgi.framework.BundleContext;
 
 public class TcpProviderTest {
 
+    private static final int TIMEOUT = 200;
     private static final int NUM_CALLS = 100;
     private static MyService myServiceProxy;
     private static Endpoint ep;
@@ -55,7 +58,7 @@ public class TcpProviderTest {
         EndpointHelper.addObjectClass(props, exportedInterfaces);
         props.put("aries.rsa.hostname", "localhost");
         props.put("aries.rsa.numThreads", "10");
-        props.put("osgi.basic.timeout", 100);
+        props.put("osgi.basic.timeout", TIMEOUT);
         MyService myService = new MyServiceImpl();
         BundleContext bc = EasyMock.mock(BundleContext.class);
         ep = provider.exportService(myService, bc, props, exportedInterfaces);
@@ -68,7 +71,7 @@ public class TcpProviderTest {
     }
     
     @Test
-    public void testCallTimeout() throws IOException {
+    public void testCallTimeout() {
         try {
             myServiceProxy.call("slow");
             Assert.fail("Expecting timeout");
@@ -78,25 +81,25 @@ public class TcpProviderTest {
     }
 
     @Test
-    public void testPerf() throws IOException, InterruptedException {
+    public void testPerf() throws InterruptedException {
         runPerfTest(myServiceProxy);
         String msg = "test";
         String result = myServiceProxy.echo(msg);
         Assert.assertEquals(msg, result);
     }
     
-    @Test(expected=RuntimeException.class)
-    public void testCallException() throws IOException, InterruptedException {
+    @Test(expected=IllegalArgumentException.class)
+    public void testCallException() {
         myServiceProxy.call("throw exception");
     }
     
     @Test
-    public void testCall() throws IOException, InterruptedException {
+    public void testCall() {
         myServiceProxy.echo("test");
     }
     
     @Test
-    public void testCallOneway() throws IOException, InterruptedException {
+    public void testCallOneway() {
         myServiceProxy.callOneWay("test");
     }
     
@@ -109,6 +112,13 @@ public class TcpProviderTest {
         myServiceProxy.callWithList(msgList);
     }
 
+    @Test
+    public void testAsyncCall() throws Exception {
+        Future<String> result = myServiceProxy.callAsync(100);
+        String answer = result.get(1, TimeUnit.SECONDS);
+        assertEquals("Finished", answer);
+    }
+
     @AfterClass
     public static void close() throws IOException {
         ep.close();

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1c5922ce/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
----------------------------------------------------------------------
diff --git 
a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
 
b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
index 6a791c9..da3b10c 100644
--- 
a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
+++ 
b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
@@ -19,6 +19,7 @@
 package org.apache.aries.rsa.provider.tcp.myservice;
 
 import java.util.List;
+import java.util.concurrent.Future;
 
 import javax.jws.Oneway;
 
@@ -32,5 +33,7 @@ public interface MyService {
     void callOneWay(String msg);
     
     void callWithList(List<String> msg);
+    
+    Future<String> callAsync(int delay); 
 
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1c5922ce/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
 
b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
index a39e155..0e4d88a 100644
--- 
a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
+++ 
b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
@@ -18,7 +18,11 @@
  */
 package org.apache.aries.rsa.provider.tcp.myservice;
 
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+
 import java.util.List;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
 
 public class MyServiceImpl implements MyService {
 
@@ -34,7 +38,7 @@ public class MyServiceImpl implements MyService {
         }
         if ("slow".equals(msg)) {
             try {
-                Thread.sleep(200);
+                Thread.sleep(300);
             } catch (InterruptedException e) {
             }
         }
@@ -49,5 +53,18 @@ public class MyServiceImpl implements MyService {
         
     }
 
+    @Override
+    public Future<String> callAsync(final int delay) {
+        return supplyAsync(new Supplier<String>() {
+            public String get() {
+                try {
+                    Thread.sleep(delay);
+                } catch (InterruptedException e) {
+                }
+                return "Finished";
+            }
+            
+        });
+    }
 
 }

Reply via email to