Repository: aries-rsa Updated Branches: refs/heads/master a8a6a057d -> 1ff19b589
[ARIES-1757] Add support for promise Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/1ff19b58 Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/1ff19b58 Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/1ff19b58 Branch: refs/heads/master Commit: 1ff19b589d9c5db6f46d5c506f8777cf4b1ca422 Parents: a8a6a05 Author: Christian Schneider <[email protected]> Authored: Thu Nov 23 13:25:59 2017 +0100 Committer: Christian Schneider <[email protected]> Committed: Thu Nov 23 13:25:59 2017 +0100 ---------------------------------------------------------------------- provider/tcp/pom.xml | 52 +++++++++++--------- .../aries/rsa/provider/tcp/TCPServer.java | 27 ++++++++-- .../rsa/provider/tcp/TcpInvocationHandler.java | 27 +++++++++- .../aries/rsa/provider/tcp/TcpProviderTest.java | 39 +++++++++++++-- .../tcp/myservice/ExpectedTestException.java | 11 +++++ .../rsa/provider/tcp/myservice/MyService.java | 10 ++-- .../provider/tcp/myservice/MyServiceImpl.java | 46 +++++++++++++---- 7 files changed, 166 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/pom.xml ---------------------------------------------------------------------- diff --git a/provider/tcp/pom.xml b/provider/tcp/pom.xml index a903860..f743563 100644 --- a/provider/tcp/pom.xml +++ b/provider/tcp/pom.xml @@ -1,28 +1,34 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.aries.rsa</groupId> - <artifactId>org.apache.aries.rsa.parent</artifactId> - <version>1.12.0-SNAPSHOT</version> - <relativePath>../../parent/pom.xml</relativePath> - </parent> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.aries.rsa</groupId> + <artifactId>org.apache.aries.rsa.parent</artifactId> + <version>1.12.0-SNAPSHOT</version> + <relativePath>../../parent/pom.xml</relativePath> + </parent> - <groupId>org.apache.aries.rsa.provider</groupId> - <artifactId>org.apache.aries.rsa.provider.tcp</artifactId> - <packaging>bundle</packaging> - <name>Aries Remote Service Admin provider TCP</name> - <description>Provider for Java Serialization over TCP</description> + <groupId>org.apache.aries.rsa.provider</groupId> + <artifactId>org.apache.aries.rsa.provider.tcp</artifactId> + <packaging>bundle</packaging> + <name>Aries Remote Service Admin provider TCP</name> + <description>Provider for Java Serialization over TCP</description> - <properties> - <topDirectoryLocation>../..</topDirectoryLocation> - </properties> + <properties> + <topDirectoryLocation>../..</topDirectoryLocation> + </properties> - <dependencies> - <dependency> - <groupId>org.apache.aries.rsa</groupId> - <artifactId>org.apache.aries.rsa.spi</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> + <dependencies> + <dependency> + <groupId>org.apache.aries.rsa</groupId> + <artifactId>org.apache.aries.rsa.spi</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.util.promise</artifactId> + <version>1.0.0</version> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java index 85a7c31..a1c8775 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TCPServer.java @@ -26,11 +26,13 @@ import java.lang.reflect.InvocationTargetException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.osgi.util.promise.Promise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,20 +80,37 @@ public class TCPServer implements Closeable, Runnable { } } - @SuppressWarnings("unchecked") private void handleCall(ObjectInputStream ois, ObjectOutputStream objectOutput) throws Exception { String methodName = (String)ois.readObject(); Object[] args = (Object[])ois.readObject(); Object result = invoker.invoke(methodName, args); + result = resolveAsnyc(result); if (result instanceof InvocationTargetException) { result = ((InvocationTargetException) result).getCause(); - } else if (result instanceof Future) { - Future<Object> fu = (Future<Object>) result; - result = fu.get(); } objectOutput.writeObject(result); } + @SuppressWarnings("unchecked") + private Object resolveAsnyc(Object result) throws InterruptedException { + if (result instanceof Future) { + Future<Object> fu = (Future<Object>) result; + try { + result = fu.get(); + } catch (ExecutionException e) { + result = e.getCause(); + } + } else if (result instanceof Promise) { + Promise<Object> fu = (Promise<Object>) result; + try { + result = fu.getValue(); + } catch (InvocationTargetException e) { + result = e.getCause(); + } + } + return result; + } + @Override public void close() throws IOException { this.serverSocket.close(); http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java index 8dea17f..ec59f3d 100644 --- a/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java +++ b/provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java @@ -29,6 +29,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.function.Supplier; +import org.osgi.util.promise.Deferred; +import org.osgi.util.promise.Promise; + public class TcpInvocationHandler implements InvocationHandler { private String host; private int port; @@ -46,23 +49,43 @@ public class TcpInvocationHandler implements InvocationHandler { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (Future.class.isAssignableFrom(method.getReturnType())) { - return createAsyncResult(method, args); + return createFutureResult(method, args); + } else if (Promise.class.isAssignableFrom(method.getReturnType())) { + return createPromiseResult(method, args); } else { return handleSyncCall(method, args); } } - private Object createAsyncResult(final Method method, final Object[] args) { + private Object createFutureResult(final Method method, final Object[] args) { return CompletableFuture.supplyAsync(new Supplier<Object>() { public Object get() { try { return handleSyncCall(method, args); + } catch (RuntimeException e) { + throw e; } catch (Throwable e) { throw new RuntimeException(e); } } }); } + + private Object createPromiseResult(final Method method, final Object[] args) { + final Deferred<Object> deferred = new Deferred<Object>(); + new Thread(new Runnable() { + + @Override + public void run() { + try { + deferred.resolve(handleSyncCall(method, args)); + } catch (Throwable e) { + deferred.fail(e); + } + } + }).start(); + return deferred.getPromise(); + } private Object handleSyncCall(Method method, Object[] args) throws Throwable { Object result; http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java index 2160588..7a33c86 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/TcpProviderTest.java @@ -22,16 +22,19 @@ import static org.hamcrest.core.StringStartsWith.startsWith; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.aries.rsa.provider.tcp.myservice.ExpectedTestException; import org.apache.aries.rsa.provider.tcp.myservice.MyService; import org.apache.aries.rsa.provider.tcp.myservice.MyServiceImpl; import org.apache.aries.rsa.spi.Endpoint; @@ -42,6 +45,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.osgi.framework.BundleContext; +import org.osgi.util.promise.Promise; public class TcpProviderTest { @@ -73,7 +77,7 @@ public class TcpProviderTest { @Test public void testCallTimeout() { try { - myServiceProxy.callSlow(); + myServiceProxy.callSlow(TIMEOUT + 100); Assert.fail("Expecting timeout"); } catch (RuntimeException e) { Assert.assertEquals(SocketTimeoutException.class, e.getCause().getClass()); @@ -88,7 +92,7 @@ public class TcpProviderTest { Assert.assertEquals(msg, result); } - @Test(expected=IllegalArgumentException.class) + @Test(expected=ExpectedTestException.class) public void testCallException() { myServiceProxy.callException(); } @@ -113,11 +117,38 @@ public class TcpProviderTest { } @Test - public void testAsyncCall() throws Exception { - Future<String> result = myServiceProxy.callAsync(100); + public void testAsyncFuture() throws Exception { + Future<String> result = myServiceProxy.callAsyncFuture(100); String answer = result.get(1, TimeUnit.SECONDS); assertEquals("Finished", answer); } + + @Test(expected = ExpectedTestException.class) + public void testAsyncFutureException() throws Throwable { + Future<String> result = myServiceProxy.callAsyncFuture(-1); + try { + result.get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Test + public void testAsyncPromise() throws Exception { + Promise<String> result = myServiceProxy.callAsyncPromise(100); + String answer = result.getValue(); + assertEquals("Finished", answer); + } + + @Test(expected = ExpectedTestException.class) + public void testAsyncPromiseException() throws Throwable { + Promise<String> result = myServiceProxy.callAsyncPromise(-1); + try { + result.getValue(); + } catch (InvocationTargetException e) { + throw e.getCause(); + } + } @AfterClass public static void close() throws IOException { http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/ExpectedTestException.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/ExpectedTestException.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/ExpectedTestException.java new file mode 100644 index 0000000..ba7d084 --- /dev/null +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/ExpectedTestException.java @@ -0,0 +1,11 @@ +package org.apache.aries.rsa.provider.tcp.myservice; + +public class ExpectedTestException extends RuntimeException { + + public ExpectedTestException() { + super(); + } + + private static final long serialVersionUID = -6271694671646172358L; + +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java index cfeb32b..026743d 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyService.java @@ -23,19 +23,23 @@ import java.util.concurrent.Future; import javax.jws.Oneway; +import org.osgi.util.promise.Promise; + public interface MyService { String echo(String msg); - void callSlow(); + void callSlow(int delay); void callException(); - + // Oneway not yet supported @Oneway void callOneWay(String msg); void callWithList(List<String> msg); - Future<String> callAsync(int delay); + Future<String> callAsyncFuture(int delay); + + Promise<String> callAsyncPromise(int delay); } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/1ff19b58/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java ---------------------------------------------------------------------- diff --git a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java index 1a5a48c..a682cb3 100644 --- a/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java +++ b/provider/tcp/src/test/java/org/apache/aries/rsa/provider/tcp/myservice/MyServiceImpl.java @@ -24,6 +24,9 @@ import java.util.List; import java.util.concurrent.Future; import java.util.function.Supplier; +import org.osgi.util.promise.Deferred; +import org.osgi.util.promise.Promise; + public class MyServiceImpl implements MyService { @Override @@ -32,16 +35,13 @@ public class MyServiceImpl implements MyService { } @Override - public void callSlow() { - try { - Thread.sleep(300); - } catch (InterruptedException e) { - } + public void callSlow(int delay) { + sleep(delay); } @Override public void callException() { - throw new IllegalArgumentException("Throwing expected exception"); + throw new ExpectedTestException(); } @Override @@ -54,17 +54,43 @@ public class MyServiceImpl implements MyService { } @Override - public Future<String> callAsync(final int delay) { + public Future<String> callAsyncFuture(final int delay) { return supplyAsync(new Supplier<String>() { public String get() { - try { - Thread.sleep(delay); - } catch (InterruptedException e) { + if (delay == -1) { + throw new ExpectedTestException(); } + sleep(delay); return "Finished"; } }); } + + @Override + public Promise<String> callAsyncPromise(final int delay) { + final Deferred<String> deferred = new Deferred<String>(); + new Thread(new Runnable() { + + @Override + public void run() { + if (delay == -1) { + deferred.fail(new ExpectedTestException()); + return; + } + sleep(delay); + deferred.resolve("Finished"); + } + }).start(); + + return deferred.getPromise(); + } + + private void sleep(int delay) { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + } + } }
