Repository: aries-rsa Updated Branches: refs/heads/master 1ff19b589 -> 84d31c7db
[ARIES-1757] Add support for CompletionStage Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/84d31c7d Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/84d31c7d Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/84d31c7d Branch: refs/heads/master Commit: 84d31c7dbcc496ec2a452c5c436ca3aed42dac5e Parents: 1ff19b5 Author: Christian Schneider <[email protected]> Authored: Thu Nov 23 15:08:54 2017 +0100 Committer: Christian Schneider <[email protected]> Committed: Thu Nov 23 15:08:54 2017 +0100 ---------------------------------------------------------------------- .../aries/rsa/provider/tcp/TCPServer.java | 8 ++++++++ .../rsa/provider/tcp/TcpInvocationHandler.java | 8 +++++--- .../aries/rsa/provider/tcp/TcpProviderTest.java | 21 ++++++++++++++++++++ .../rsa/provider/tcp/myservice/MyService.java | 5 ++++- .../provider/tcp/myservice/MyServiceImpl.java | 15 ++++++++++++++ 5 files changed, 53 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/84d31c7d/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java index a1c8775..ce1b06a 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java @@ -26,6 +26,7 @@ import java.lang.reflect.InvocationTargetException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -100,6 +101,13 @@ public class TCPServer implements Closeable, Runnable { } catch (ExecutionException e) { result = e.getCause(); } + } else if (result instanceof CompletionStage) { + CompletionStage<Object> fu = (CompletionStage<Object>) result; + try { + result = fu.toCompletableFuture().get(); + } catch (ExecutionException e) { + result = e.getCause(); + } } else if (result instanceof Promise) { Promise<Object> fu = (Promise<Object>) result; try { http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/84d31c7d/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java index ec59f3d..fbda0ec 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java @@ -26,6 +26,7 @@ import java.lang.reflect.Method; import java.net.Socket; import java.net.UnknownHostException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Future; import java.util.function.Supplier; @@ -48,10 +49,11 @@ public class TcpInvocationHandler implements InvocationHandler { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - if (Future.class.isAssignableFrom(method.getReturnType())) { + if (Future.class.isAssignableFrom(method.getReturnType()) || + CompletionStage.class.isAssignableFrom(method.getReturnType())) { return createFutureResult(method, args); } else if (Promise.class.isAssignableFrom(method.getReturnType())) { - return createPromiseResult(method, args); + return createPromiseResult(method, args); } else { return handleSyncCall(method, args); } @@ -70,7 +72,7 @@ public class TcpInvocationHandler implements InvocationHandler { } }); } - + private Object createPromiseResult(final Method method, final Object[] args) { final Deferred<Object> deferred = new Deferred<Object>(); new Thread(new Runnable() { http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/84d31c7d/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java index 7a33c86..538b52b 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java @@ -28,6 +28,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -134,6 +136,25 @@ public class TcpProviderTest { } @Test + public void testAsyncCompletionStage() throws Exception { + CompletionStage<String> result = myServiceProxy.callAsyncCompletionStage(100); + CompletableFuture<String> fresult = result.toCompletableFuture(); + String answer = fresult.get(1, TimeUnit.SECONDS); + assertEquals("Finished", answer); + } + + @Test(expected = ExpectedTestException.class) + public void testAsyncCompletionStageException() throws Throwable { + CompletionStage<String> result = myServiceProxy.callAsyncCompletionStage(-1); + CompletableFuture<String> fresult = result.toCompletableFuture(); + try { + fresult.get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Test public void testAsyncPromise() throws Exception { Promise<String> result = myServiceProxy.callAsyncPromise(100); String answer = result.getValue(); http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/84d31c7d/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java index 026743d..79853b8 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java @@ -19,6 +19,7 @@ package org.apache.aries.rsa.provider.tcp.myservice; import java.util.List; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Future; import javax.jws.Oneway; @@ -40,6 +41,8 @@ public interface MyService { Future<String> callAsyncFuture(int delay); - Promise<String> callAsyncPromise(int delay); + Promise<String> callAsyncPromise(int delay); + + CompletionStage<String> callAsyncCompletionStage(int delay); } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/84d31c7d/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java index a682cb3..a030c9e 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java @@ -21,6 +21,7 @@ package org.apache.aries.rsa.provider.tcp.myservice; import static java.util.concurrent.CompletableFuture.supplyAsync; import java.util.List; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Future; import java.util.function.Supplier; @@ -68,6 +69,20 @@ public class MyServiceImpl implements MyService { } @Override + public CompletionStage<String> callAsyncCompletionStage(final int delay) { + return supplyAsync(new Supplier<String>() { + public String get() { + if (delay == -1) { + throw new ExpectedTestException(); + } + sleep(delay); + return "Finished"; + } + + }); + } + + @Override public Promise<String> callAsyncPromise(final int delay) { final Deferred<String> deferred = new Deferred<String>(); new Thread(new Runnable() {
