Repository: aries-rsa
Updated Branches:
  refs/heads/master 1ff19b589 -> 84d31c7db


[ARIES-1757] Add support for CompletionStage


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/84d31c7d
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/84d31c7d
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/84d31c7d

Branch: refs/heads/master
Commit: 84d31c7dbcc496ec2a452c5c436ca3aed42dac5e
Parents: 1ff19b5
Author: Christian Schneider <[email protected]>
Authored: Thu Nov 23 15:08:54 2017 +0100
Committer: Christian Schneider <[email protected]>
Committed: Thu Nov 23 15:08:54 2017 +0100

----------------------------------------------------------------------
 .../aries/rsa/provider/tcp/TCPServer.java       |  8 ++++++++
 .../rsa/provider/tcp/TcpInvocationHandler.java  |  8 +++++---
 .../aries/rsa/provider/tcp/TcpProviderTest.java | 21 ++++++++++++++++++++
 .../rsa/provider/tcp/myservice/MyService.java   |  5 ++++-
 .../provider/tcp/myservice/MyServiceImpl.java   | 15 ++++++++++++++
 5 files changed, 53 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/84d31c7d/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
----------------------------------------------------------------------
diff --git 
a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java 
b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
index a1c8775..ce1b06a 100644
--- 
a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
+++ 
b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java
@@ -26,6 +26,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -100,6 +101,13 @@ public class TCPServer implements Closeable, Runnable {
             } catch (ExecutionException e) {
                 result = e.getCause();
             }
+        } else if (result instanceof CompletionStage) {
+            CompletionStage<Object> fu = (CompletionStage<Object>) result;
+            try {
+                result = fu.toCompletableFuture().get();
+            } catch (ExecutionException e) {
+                result = e.getCause();
+            }
         } else if (result instanceof Promise) {
             Promise<Object> fu = (Promise<Object>) result;  
             try {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/84d31c7d/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
 
b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
index ec59f3d..fbda0ec 100644
--- 
a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
+++ 
b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java
@@ -26,6 +26,7 @@ import java.lang.reflect.Method;
 import java.net.Socket;
 import java.net.UnknownHostException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Future;
 import java.util.function.Supplier;
 
@@ -48,10 +49,11 @@ public class TcpInvocationHandler implements 
InvocationHandler {
 
     @Override
     public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
-        if (Future.class.isAssignableFrom(method.getReturnType())) {
+        if (Future.class.isAssignableFrom(method.getReturnType()) ||
+            CompletionStage.class.isAssignableFrom(method.getReturnType())) {
             return createFutureResult(method, args);
         } else if (Promise.class.isAssignableFrom(method.getReturnType())) {
-                return createPromiseResult(method, args);
+            return createPromiseResult(method, args);
         } else {
             return handleSyncCall(method, args);
         }
@@ -70,7 +72,7 @@ public class TcpInvocationHandler implements 
InvocationHandler {
             }
         });
     }
-    
+
     private Object createPromiseResult(final Method method, final Object[] 
args) {
         final Deferred<Object> deferred = new Deferred<Object>();
         new Thread(new Runnable() {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/84d31c7d/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
----------------------------------------------------------------------
diff --git 
a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
 
b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
index 7a33c86..538b52b 100644
--- 
a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
+++ 
b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java
@@ -28,6 +28,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -134,6 +136,25 @@ public class TcpProviderTest {
     }
     
     @Test
+    public void testAsyncCompletionStage() throws Exception {
+        CompletionStage<String> result = 
myServiceProxy.callAsyncCompletionStage(100);
+        CompletableFuture<String> fresult = result.toCompletableFuture();
+        String answer = fresult.get(1, TimeUnit.SECONDS);
+        assertEquals("Finished", answer);
+    }
+    
+    @Test(expected = ExpectedTestException.class)
+    public void testAsyncCompletionStageException() throws Throwable {
+        CompletionStage<String> result = 
myServiceProxy.callAsyncCompletionStage(-1);
+        CompletableFuture<String> fresult = result.toCompletableFuture();
+        try {
+            fresult.get();
+        } catch (ExecutionException e) {
+            throw e.getCause();
+        }
+    }
+    
+    @Test
     public void testAsyncPromise() throws Exception {
         Promise<String> result = myServiceProxy.callAsyncPromise(100);
         String answer = result.getValue();

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/84d31c7d/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
----------------------------------------------------------------------
diff --git 
a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
 
b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
index 026743d..79853b8 100644
--- 
a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
+++ 
b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java
@@ -19,6 +19,7 @@
 package org.apache.aries.rsa.provider.tcp.myservice;
 
 import java.util.List;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Future;
 
 import javax.jws.Oneway;
@@ -40,6 +41,8 @@ public interface MyService {
     
     Future<String> callAsyncFuture(int delay);
 
-    Promise<String> callAsyncPromise(int delay); 
+    Promise<String> callAsyncPromise(int delay);
+
+    CompletionStage<String> callAsyncCompletionStage(int delay); 
 
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/84d31c7d/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
 
b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
index a682cb3..a030c9e 100644
--- 
a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
+++ 
b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java
@@ -21,6 +21,7 @@ package org.apache.aries.rsa.provider.tcp.myservice;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 
 import java.util.List;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Future;
 import java.util.function.Supplier;
 
@@ -68,6 +69,20 @@ public class MyServiceImpl implements MyService {
     }
     
     @Override
+    public CompletionStage<String> callAsyncCompletionStage(final int delay) {
+        return supplyAsync(new Supplier<String>() {
+            public String get() {
+                if (delay == -1) {
+                    throw new ExpectedTestException();
+                }
+                sleep(delay);
+                return "Finished";
+            }
+            
+        });
+    }
+    
+    @Override
     public Promise<String> callAsyncPromise(final int delay) {
         final Deferred<String> deferred = new Deferred<String>();
         new Thread(new Runnable() {

Reply via email to