Repository: aries-rsa
Updated Branches:
  refs/heads/master fe14770a7 -> 42087ca27


[ARIES-1586] Async calls in fastbin

remote methods can now return Future/CompletableFuture/Promise for async
invocation.
Also adds a Readme.md for fastbin
closes #13


Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo
Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/42087ca2
Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/42087ca2
Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/42087ca2

Branch: refs/heads/master
Commit: 42087ca275ad804ee8ff2fda97b6a5b480e3fd06
Parents: fe14770
Author: Johannes Utzig <[email protected]>
Authored: Mon Jul 25 12:55:22 2016 +0200
Committer: Christian Schneider <[email protected]>
Committed: Thu Aug 4 14:57:05 2016 +0200

----------------------------------------------------------------------
 Readme.md                                       |  22 +-
 provider/fastbin/Readme.md                      |  23 ++
 provider/fastbin/pom.xml                        |  15 ++
 .../fastbin/tcp/AbstractInvocationStrategy.java | 132 +++++++++++
 .../tcp/AsyncFutureInvocationStrategy.java      | 232 +++++++++++++++++++
 .../fastbin/tcp/AsyncInvocationStrategy.java    |  86 ++-----
 .../tcp/AsyncPromiseInvocationStrategy.java     | 139 +++++++++++
 .../fastbin/tcp/BlockingInvocationStrategy.java |  18 +-
 .../provider/fastbin/tcp/ClientInvokerImpl.java |   7 +-
 .../provider/fastbin/tcp/InvocationType.java    | 106 +++++++++
 .../provider/fastbin/tcp/ServerInvokerImpl.java |   7 +-
 .../provider/fastbin/FutureInvocationTest.java  | 201 ++++++++++++++++
 .../provider/fastbin/PromiseInvocationTest.java | 169 ++++++++++++++
 13 files changed, 1067 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/Readme.md
----------------------------------------------------------------------
diff --git a/Readme.md b/Readme.md
index 3b282e5..cc76e26 100644
--- a/Readme.md
+++ b/Readme.md
@@ -1,8 +1,24 @@
 # Aries Remote Service Admin (RSA)
 
 The [Aries Remote Service Admin 
(RSA)](http://aries.apache.org/modules/rsa.html) project allows to 
transparently use OSGi
-services for remote communication. OSGi services can be marked for export by 
adding a service property 
+services for remote communication. OSGi services can be marked for export by 
adding a service property
 service.exported.interfaces=*. Various other  properties can be used to 
customize how the service is to be exposed.
 
-For more information, check out section "122 JPA Service Specification Version 
1.0" in the "OSGi Service Platform
- Enterprise Specification, Release 4, Version 4.2" available for public 
download from the OSGi Alliance. 
+For more information, check out section "13 Remote Services Version 1.0" in 
the "OSGi Service Platform
+ Service Compendium, Release 4, Version 4.2" available for public download 
from the OSGi Alliance.
+
+## Distribution Provider
+
+Aries Remote Service Admin provides two different transport layers out of the 
box and can be extended with custom transports.
+Please refer to their individual Readme.me on how to use them.
+
+ * `aries.tcp` - A very lightweight TCP based transport that is ideal to get a 
first demo running and to serve as template for custom distribution providers
+ * `aries.fastbin` - A fast binary transport that uses multiplexing on a pool 
of java nio channels. Fastbin supports both sync and long running async calls 
(via Future/Promise)
+
+## Discovery Provider
+
+The discovery providers are responsible for finding the available endpoint 
descriptions of remote services. Aries RSA provides three different 
implementations and can be extended with custom discovery providers. The three 
available implementations are
+
+ * zookeeper - Manages endpoint descriptions as zookeeper nodes.
+ * local - Scans bundles for endpoint descriptions
+ * config - Reads endpoint descriptions from ConfigAdmin service
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/Readme.md
----------------------------------------------------------------------
diff --git a/provider/fastbin/Readme.md b/provider/fastbin/Readme.md
new file mode 100644
index 0000000..2997a1b
--- /dev/null
+++ b/provider/fastbin/Readme.md
@@ -0,0 +1,23 @@
+# Fastbin transport provider
+
+Allows transparent remoting using Java Serialization over TCP. The fastbin 
provider uses a pool of nio tcp channels to transport data.
+It can use either java object serialization or protobuf to serialize 
parameters and return values.
+Sync remote calls have a default timeout of 5 minutes. For long running 
operations async calls should be used. This is indicated by having either
+
+ * `Future`
+ * `CompletableFuture`
+ * `Promise`
+
+as the return value of the remote method. The client will receive a proxy of 
that type that will be resolved async as soon as the server finished 
computation.
+
+
+## Endpoint Configuration
+
+service.exported.configs: aries.fastbin
+
+| Key                      | Default               | Description               
                               |
+| -------------------------| --------------------- | 
-------------------------------------------------------- |
+| uri                      | tcp://0.0.0.0:2543    | The bind address to use   
                               |
+| exportAddress            | looks up the hostname | The ip/hostname how 
remote clients can reach this server |
+| timeout                  | 300000                | The timeout for sync 
calls (default 5 minutes)           |
+

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/pom.xml
----------------------------------------------------------------------
diff --git a/provider/fastbin/pom.xml b/provider/fastbin/pom.xml
index 59e5417..5e1a5b9 100644
--- a/provider/fastbin/pom.xml
+++ b/provider/fastbin/pom.xml
@@ -37,11 +37,26 @@
             <artifactId>hawtbuf-proto</artifactId>
             <optional>true</optional>
         </dependency>
+        <!-- this is only needed you you want to use the Promise for remote 
services -->
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.util.promise</artifactId>
+            <version>1.0.0</version>
+            <optional>true</optional>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
             <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.fusesource.hawtbuf</groupId>
                 <artifactId>hawtbuf-protoc</artifactId>
                 <version>${hawtbuf.version}</version>

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
----------------------------------------------------------------------
diff --git 
a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
new file mode 100644
index 0000000..75f06a9
--- /dev/null
+++ 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.tcp;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.osgi.framework.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractInvocationStrategy implements InvocationStrategy
+{
+
+    protected final static Logger LOGGER = 
LoggerFactory.getLogger(AbstractInvocationStrategy.class);
+
+    @Override
+    public ResponseFuture request(SerializationStrategy serializationStrategy, 
ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream 
requestStream) throws Exception {
+        encodeRequest(serializationStrategy, loader, method, args, 
requestStream);
+        return createResponse(serializationStrategy, loader,method, args);
+    }
+
+    /**
+     * encodes the request to the stream
+     * @param serializationStrategy
+     * @param loader
+     * @param method
+     * @param args
+     * @param requestStream
+     * @param protocolVersion
+     * @throws Exception
+     */
+    protected void encodeRequest(SerializationStrategy serializationStrategy, 
ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream 
requestStream) throws Exception {
+        serializationStrategy.encodeRequest(loader, 
method.getParameterTypes(), args, requestStream);
+    }
+
+    /**
+     * creates a response for the remote method call
+     * @param serializationStrategy
+     * @param loader
+     * @param method
+     * @param args
+     * @return
+     * @throws Exception
+     */
+    protected abstract ResponseFuture createResponse(SerializationStrategy 
serializationStrategy, ClassLoader loader, Method method, Object[] args) throws 
Exception;
+
+
+    @Override
+    public final void service(SerializationStrategy serializationStrategy, 
ClassLoader loader, Method method, Object target, DataByteArrayInputStream 
requestStream, DataByteArrayOutputStream responseStream, Runnable onComplete) {
+        doService(serializationStrategy, loader, method, target, 
requestStream, responseStream, onComplete);
+
+    }
+
+    /**
+     * performs the actual remote call using the provided parameters
+     * @param serializationStrategy the strategy to serialize the objects with
+     * @param loader the classloader to use
+     * @param method the method to call
+     * @param target the object to call the method on
+     * @param requestStream
+     * @param responseStream
+     * @param onComplete to be executed after the call has finished
+     */
+    protected abstract void doService(SerializationStrategy 
serializationStrategy, ClassLoader loader, Method method, Object target, 
DataByteArrayInputStream requestStream, DataByteArrayOutputStream 
responseStream, Runnable onComplete);
+
+    protected Class getResultType(Method method) {
+        return method.getReturnType();
+    }
+
+    protected class AsyncServiceResponse {
+
+        private final ClassLoader loader;
+        private final Method method;
+        private final DataByteArrayOutputStream responseStream;
+        private final Runnable onComplete;
+        private final SerializationStrategy serializationStrategy;
+        private final int pos;
+        // Used to protect against sending multiple responses.
+        final AtomicBoolean responded = new AtomicBoolean(false);
+
+        public AsyncServiceResponse(ClassLoader loader, Method method, 
DataByteArrayOutputStream responseStream, Runnable onComplete, 
SerializationStrategy serializationStrategy) {
+            this.loader = loader;
+            this.method = method;
+            this.responseStream = responseStream;
+            this.onComplete = onComplete;
+            this.serializationStrategy = serializationStrategy;
+            pos = responseStream.position();
+        }
+
+        public void send(Throwable error, Object value) {
+            if( responded.compareAndSet(false, true) ) {
+                Class resultType = getResultType(method);
+                try {
+                    serializationStrategy.encodeResponse(loader, resultType, 
value, error, responseStream);
+                } catch (Exception e) {
+                    // we failed to encode the response.. reposition and write 
that error.
+                    try {
+                        responseStream.position(pos);
+                        serializationStrategy.encodeResponse(loader, 
resultType, value, new ServiceException(e.toString()), responseStream);
+                    } catch (Exception unexpected) {
+                        LOGGER.error("Error while servicing 
"+method,unexpected);
+                    }
+                } finally {
+                    onComplete.run();
+                }
+            }
+        }
+    }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncFutureInvocationStrategy.java
----------------------------------------------------------------------
diff --git 
a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncFutureInvocationStrategy.java
 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncFutureInvocationStrategy.java
new file mode 100644
index 0000000..a46faf4
--- /dev/null
+++ 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncFutureInvocationStrategy.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.tcp;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+import org.apache.aries.rsa.provider.fastbin.api.AsyncCallback;
+import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncFutureInvocationStrategy extends AbstractInvocationStrategy {
+
+    private FutureCompleter completer = new FutureCompleter();
+
+    @SuppressWarnings("unchecked")
+    protected void doService(SerializationStrategy serializationStrategy, 
ClassLoader loader, Method method, Object target, DataByteArrayInputStream 
requestStream, final DataByteArrayOutputStream responseStream, final Runnable 
onComplete) {
+
+        final AsyncServiceResponse helper = new AsyncServiceResponse(loader, 
method, responseStream, onComplete, serializationStrategy);
+        try {
+            Class<?>[] types = method.getParameterTypes();
+            final Object[] args = new Object[types.length];
+            serializationStrategy.decodeRequest(loader, types, requestStream, 
args);
+            Future<Object> future = (Future<Object>)method.invoke(target, 
args);
+            CompletableFuture<Object> completable = null;
+            if(future instanceof CompletableFuture) {
+                completable = (CompletableFuture<Object>)future;
+            }
+            else {
+                completable = completer.complete(future);
+            }
+            completable.whenComplete(new BiConsumer<Object, Throwable>() {
+                public void accept(Object returnValue, Throwable exception) {
+                    helper.send(exception, returnValue);
+                };
+            });
+
+        } catch (Throwable t) {
+            helper.send(t, null);
+        }
+    }
+
+
+    @Override
+    protected ResponseFuture createResponse(SerializationStrategy 
serializationStrategy, ClassLoader loader, Method method, Object[] args) throws 
Exception {
+        return new AsyncResponseFuture(loader, method, serializationStrategy, 
Dispatch.getCurrentQueue());
+    }
+
+    protected Class getResultType(Method method) {
+        try {
+            Type type = method.getGenericReturnType();
+            ParameterizedType t = (ParameterizedType) type;
+            return (Class) t.getActualTypeArguments()[0];
+        }
+        catch (Exception e) {
+            return super.getResultType(method);
+        }
+    }
+
+    @SuppressWarnings({"rawtypes"})
+    private class AsyncResponseFuture implements ResponseFuture, AsyncCallback 
{
+
+        private final ClassLoader loader;
+        private final Method method;
+        private final SerializationStrategy serializationStrategy;
+        private final DispatchQueue queue;
+        private CompletableFuture<Object> future;
+
+        public AsyncResponseFuture(ClassLoader loader, Method method, 
SerializationStrategy serializationStrategy, DispatchQueue queue) {
+            this.loader = loader;
+            this.method = method;
+            this.serializationStrategy = serializationStrategy;
+            this.queue = queue;
+            this.future = new CompletableFuture<>();
+        }
+
+        public void set(final DataByteArrayInputStream source) {
+            if( queue != null ) {
+                queue.execute(new Runnable() {
+                    public void run() {
+                        decodeIt(source);
+                    }
+                });
+            } else {
+                decodeIt(source);
+            }
+        }
+
+        private void decodeIt(DataByteArrayInputStream source) {
+            try {
+                serializationStrategy.decodeResponse(loader, 
getResultType(method), source, this);
+            } catch (Throwable e) {
+                onFailure(e);
+            }
+        }
+
+        @Override
+        public void fail(Throwable throwable) {
+
+            onFailure(throwable);
+        }
+
+        @Override
+        public void onSuccess(Object result) {
+            future.complete(result);
+        }
+
+        @Override
+        public void onFailure(Throwable failure) {
+            future.completeExceptionally(failure);
+        }
+
+        @Override
+        public Object get(long timeout, TimeUnit unit) throws Exception
+        {
+            return future;
+        }
+    }
+
+    /**
+     * Helper class that polls available futures in a background thread for 
readiness
+     * and reports them to a completable future
+     *
+     */
+    private static class FutureCompleter extends Thread {
+
+        private ConcurrentMap<Future<Object>, CompletableFuture<Object>> 
futures;
+        private Semaphore counter;
+        private AtomicBoolean started;
+
+        public FutureCompleter() {
+            setName("Fastbin-Future-Completer");
+            setDaemon(true);
+            futures = new ConcurrentHashMap<>();
+            counter = new Semaphore(0);
+            started = new AtomicBoolean(false);
+        }
+
+        @Override
+        public void run() {
+            while(true) {
+                // all currently available entries will be processed
+                int takenPermits = Math.max(1, counter.availablePermits());
+                try {
+                    counter.acquire(takenPermits);
+                }
+                catch (InterruptedException e) {
+                    continue;
+                }
+                Set<Entry<Future<Object>, CompletableFuture<Object >>> 
entrySet = futures.entrySet();
+                int processed = 0;
+                for (Entry<Future<Object>, CompletableFuture<Object>> entry : 
entrySet) {
+                    if(processed == takenPermits) {
+                        //we only release as many as we took permits. The 
remainder will be handled in the next iteration
+                        break;
+                    }
+                    Future< ? > future = entry.getKey();
+                    if(future.isDone()) {
+                        try {
+                            Object object = future.get();
+                            entry.getValue().complete(object);
+                        }
+                        catch (ExecutionException e) {
+                            
entry.getValue().completeExceptionally(e.getCause());
+                        }
+                        catch (Exception e) {
+                            entry.getValue().completeExceptionally(e);
+                        }
+                        futures.remove(future);
+                        processed++;
+                    }
+                    else {
+                        // if the future is complete, the permit is not 
released
+                        counter.release();
+                    }
+                    try {
+                        Thread.sleep(20);
+                    }
+                    catch (InterruptedException e) {
+                        // sleep a little to wait for additional futures to 
complete
+                    }
+                }
+            }
+        }
+
+        public CompletableFuture<Object> complete(Future<Object> future) {
+            if(started.compareAndSet(false, true)) {
+                start();
+            }
+            CompletableFuture<Object> completable = new CompletableFuture<>();
+            futures.put(future, completable);
+            counter.release();
+            return completable;
+        }
+    }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java
----------------------------------------------------------------------
diff --git 
a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java
 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java
index 77167b0..47ab6b1 100644
--- 
a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java
+++ 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java
@@ -33,21 +33,17 @@ import org.fusesource.hawtbuf.DataByteArrayInputStream;
 import org.fusesource.hawtbuf.DataByteArrayOutputStream;
 import org.fusesource.hawtdispatch.Dispatch;
 import org.fusesource.hawtdispatch.DispatchQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * <p>
  * </p>
  *
  */
-public class AsyncInvocationStrategy implements InvocationStrategy {
-
-    public static final AsyncInvocationStrategy INSTANCE = new 
AsyncInvocationStrategy();
-
-    static public boolean isAsyncMethod(Method method) {
-        Class<?>[] types = method.getParameterTypes();
-        return types.length != 0 && types[types.length - 1] == 
AsyncCallback.class;
-    }
+public class AsyncInvocationStrategy extends AbstractInvocationStrategy {
 
+    protected static final Logger LOGGER = 
LoggerFactory.getLogger(AsyncInvocationStrategy.class);
 
     private class AsyncResponseFuture implements ResponseFuture {
 
@@ -96,20 +92,25 @@ public class AsyncInvocationStrategy implements 
InvocationStrategy {
         }
     }
 
-    public ResponseFuture request(SerializationStrategy serializationStrategy, 
ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream 
target) throws Exception {
-        if(!isAsyncMethod(method)) {
-            throw new IllegalArgumentException("Invalid async method 
declaration: last argument is not a RequestCallback");
-        }
-
-        Class[] new_types = payloadTypes(method);
+    @Override
+    protected void encodeRequest(SerializationStrategy serializationStrategy, 
ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream 
requestStream) throws Exception {
+        Class<?>[] new_types = payloadTypes(method);
         Object[] new_args = new Object[args.length-1];
         System.arraycopy(args, 0, new_args, 0, new_args.length);
+        serializationStrategy.encodeRequest(loader, new_types, new_args, 
requestStream);
+    }
 
-        serializationStrategy.encodeRequest(loader, new_types, new_args, 
target);
-
+    @Override
+    protected ResponseFuture createResponse(SerializationStrategy 
serializationStrategy, ClassLoader loader, Method method, Object[] args) throws 
Exception {
         return new AsyncResponseFuture(loader, method, (AsyncCallback) 
args[args.length-1], serializationStrategy, Dispatch.getCurrentQueue());
     }
 
+    protected Class getResultType(Method method) {
+        Type[] types = method.getGenericParameterTypes();
+        ParameterizedType t = (ParameterizedType) types[types.length-1];
+        return (Class) t.getActualTypeArguments()[0];
+    }
+
     static private Class<?>[] payloadTypes(Method method) {
         Class<?>[] types = method.getParameterTypes();
         Class<?>[] new_types = new Class<?>[types.length-1];
@@ -117,57 +118,9 @@ public class AsyncInvocationStrategy implements 
InvocationStrategy {
         return new_types;
     }
 
-    static private Class getResultType(Method method) {
-        Type[] types = method.getGenericParameterTypes();
-        ParameterizedType t = (ParameterizedType) types[types.length-1];
-        return (Class) t.getActualTypeArguments()[0];
-    }
-
-
-    class ServiceResponse {
+    protected void doService(SerializationStrategy serializationStrategy, 
ClassLoader loader, Method method, Object target, DataByteArrayInputStream 
requestStream, final DataByteArrayOutputStream responseStream, final Runnable 
onComplete) {
 
-        private final ClassLoader loader;
-        private final Method method;
-        private final DataByteArrayOutputStream responseStream;
-        private final Runnable onComplete;
-        private final SerializationStrategy serializationStrategy;
-        private final int pos;
-        // Used to protect against sending multiple responses.
-        final AtomicBoolean responded = new AtomicBoolean(false);
-
-        public ServiceResponse(ClassLoader loader, Method method, 
DataByteArrayOutputStream responseStream, Runnable onComplete, 
SerializationStrategy serializationStrategy) {
-            this.loader = loader;
-            this.method = method;
-            this.responseStream = responseStream;
-            this.onComplete = onComplete;
-            this.serializationStrategy = serializationStrategy;
-            pos = responseStream.position();
-        }
-
-        public void send(Throwable error, Object value) {
-            if( responded.compareAndSet(false, true) ) {
-                Class resultType = getResultType(method);
-                try {
-                    serializationStrategy.encodeResponse(loader, resultType, 
value, error, responseStream);
-                } catch (Exception e) {
-                    // we failed to encode the response.. reposition and write 
that error.
-                    try {
-                        responseStream.position(pos);
-                        serializationStrategy.encodeResponse(loader, 
resultType, value, new RemoteException(e.toString()), responseStream);
-                    } catch (Exception unexpected) {
-                        unexpected.printStackTrace();
-                    }
-                } finally {
-                    onComplete.run();
-                }
-            }
-        }
-
-
-    }
-    public void service(SerializationStrategy serializationStrategy, 
ClassLoader loader, Method method, Object target, DataByteArrayInputStream 
requestStream, final DataByteArrayOutputStream responseStream, final Runnable 
onComplete) {
-
-        final ServiceResponse helper = new ServiceResponse(loader, method, 
responseStream, onComplete, serializationStrategy);
+        final AsyncServiceResponse helper = new AsyncServiceResponse(loader, 
method, responseStream, onComplete, serializationStrategy);
         try {
 
             Object[] new_args = new Object[method.getParameterTypes().length];
@@ -188,4 +141,5 @@ public class AsyncInvocationStrategy implements 
InvocationStrategy {
 
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncPromiseInvocationStrategy.java
----------------------------------------------------------------------
diff --git 
a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncPromiseInvocationStrategy.java
 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncPromiseInvocationStrategy.java
new file mode 100644
index 0000000..ec2cd26
--- /dev/null
+++ 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncPromiseInvocationStrategy.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.tcp;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.aries.rsa.provider.fastbin.api.AsyncCallback;
+import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy;
+import org.fusesource.hawtbuf.DataByteArrayInputStream;
+import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+
+public class AsyncPromiseInvocationStrategy extends AbstractInvocationStrategy 
{
+
+    @SuppressWarnings("unchecked")
+    protected void doService(SerializationStrategy serializationStrategy, 
ClassLoader loader, Method method, Object target, DataByteArrayInputStream 
requestStream, final DataByteArrayOutputStream responseStream, final Runnable 
onComplete) {
+
+        final AsyncServiceResponse helper = new AsyncServiceResponse(loader, 
method, responseStream, onComplete, serializationStrategy);
+        try {
+            Class<?>[] types = method.getParameterTypes();
+            final Object[] args = new Object[types.length];
+            serializationStrategy.decodeRequest(loader, types, requestStream, 
args);
+            final Promise<Object> promise = 
(Promise<Object>)method.invoke(target, args);
+            promise.onResolve(() -> {
+                try{
+                    helper.send(promise.getFailure(), 
promise.getFailure()==null ? promise.getValue() : null);
+                }
+                catch (Exception e){
+                    helper.send(e, null);
+                }
+            });
+
+        } catch (Throwable t) {
+            helper.send(t, null);
+        }
+    }
+
+
+    @Override
+    protected ResponseFuture createResponse(SerializationStrategy 
serializationStrategy, ClassLoader loader, Method method, Object[] args) throws 
Exception {
+        return new AsyncResponseFuture(loader, method, serializationStrategy, 
Dispatch.getCurrentQueue());
+    }
+
+    protected Class getResultType(Method method) {
+        try {
+            Type type = method.getGenericReturnType();
+            ParameterizedType t = (ParameterizedType) type;
+            return (Class) t.getActualTypeArguments()[0];
+        }
+        catch (Exception e) {
+            return super.getResultType(method);
+        }
+    }
+
+    @SuppressWarnings({"rawtypes"})
+    private class AsyncResponseFuture implements ResponseFuture, AsyncCallback 
{
+
+        private final ClassLoader loader;
+        private final Method method;
+        private final SerializationStrategy serializationStrategy;
+        private final DispatchQueue queue;
+        private Deferred<Object> deferred;
+
+        public AsyncResponseFuture(ClassLoader loader, Method method, 
SerializationStrategy serializationStrategy, DispatchQueue queue) {
+            this.loader = loader;
+            this.method = method;
+            this.serializationStrategy = serializationStrategy;
+            this.queue = queue;
+            this.deferred = new Deferred<>();
+        }
+
+        public void set(final DataByteArrayInputStream source) {
+            if( queue != null ) {
+                queue.execute(new Runnable() {
+                    public void run() {
+                        decodeIt(source);
+                    }
+                });
+            } else {
+                decodeIt(source);
+            }
+        }
+
+        private void decodeIt(DataByteArrayInputStream source) {
+            try {
+                serializationStrategy.decodeResponse(loader, 
getResultType(method), source, this);
+            } catch (Throwable e) {
+                onFailure(e);
+            }
+        }
+
+        @Override
+        public void fail(Throwable throwable) {
+
+            onFailure(throwable);
+        }
+
+        @Override
+        public void onSuccess(Object result) {
+            deferred.resolve(result);
+        }
+
+        @Override
+        public void onFailure(Throwable failure) {
+            deferred.fail(failure);
+        }
+
+        @Override
+        public Object get(long timeout, TimeUnit unit) throws Exception
+        {
+            return deferred.getPromise();
+        }
+    }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
----------------------------------------------------------------------
diff --git 
a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
index bbda2ee..5190157 100644
--- 
a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
+++ 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java
@@ -29,16 +29,17 @@ import 
org.apache.aries.rsa.provider.fastbin.api.AsyncCallback;
 import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy;
 import org.fusesource.hawtbuf.DataByteArrayInputStream;
 import org.fusesource.hawtbuf.DataByteArrayOutputStream;
-import org.fusesource.hawtdispatch.Dispatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * <p>
  * </p>
  *
  */
-public class BlockingInvocationStrategy implements InvocationStrategy {
+public class BlockingInvocationStrategy extends AbstractInvocationStrategy {
 
-    public static final BlockingInvocationStrategy INSTANCE = new 
BlockingInvocationStrategy();
+    protected static final Logger LOGGER = 
LoggerFactory.getLogger(BlockingInvocationStrategy.class);
 
     private static final Callable<Object> EMPTY_CALLABLE = new 
Callable<Object>() {
         public Object call() {
@@ -80,15 +81,13 @@ public class BlockingInvocationStrategy implements 
InvocationStrategy {
         }
     }
 
-    public ResponseFuture request(SerializationStrategy serializationStrategy, 
ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream 
target) throws Exception {
 
-        assert Dispatch.getCurrentQueue() == null : "You should not do 
blocking RPC class when executing on a dispatch queue";
-
-        serializationStrategy.encodeRequest(loader, 
method.getParameterTypes(), args, target);
+    @Override
+    protected ResponseFuture createResponse(SerializationStrategy 
serializationStrategy, ClassLoader loader, Method method, Object[] args) throws 
Exception {
         return new BlockingResponseFuture(loader, method, 
serializationStrategy);
     }
 
-    public void service(SerializationStrategy serializationStrategy, 
ClassLoader loader, Method method, Object target, DataByteArrayInputStream 
requestStream, DataByteArrayOutputStream responseStream, Runnable onComplete) {
+    public void doService(SerializationStrategy serializationStrategy, 
ClassLoader loader, Method method, Object target, DataByteArrayInputStream 
requestStream, DataByteArrayOutputStream responseStream, Runnable onComplete) {
 
         int pos = responseStream.position();
         try {
@@ -113,12 +112,13 @@ public class BlockingInvocationStrategy implements 
InvocationStrategy {
 
         } catch(Exception e) {
 
+            LOGGER.warn("Initial Encoding response for method "+method+" 
failed. Retrying",e);
             // we failed to encode the response.. reposition and write that 
error.
             try {
                 responseStream.position(pos);
                 serializationStrategy.encodeResponse(loader, 
method.getReturnType(), null, new RemoteException(e.toString()), 
responseStream);
             } catch (Exception unexpected) {
-                unexpected.printStackTrace();
+                LOGGER.error("Error while servicing "+method,unexpected);
             }
 
         } finally {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java
----------------------------------------------------------------------
diff --git 
a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java
 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java
index 877b297..ba0a12b 100644
--- 
a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java
+++ 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java
@@ -199,12 +199,7 @@ public class ClientInvokerImpl implements ClientInvoker, 
Dispatched {
                 serializationStrategy = ObjectSerializationStrategy.INSTANCE;
             }
 
-            final InvocationStrategy strategy;
-            if( AsyncInvocationStrategy.isAsyncMethod(method) ) {
-                strategy = AsyncInvocationStrategy.INSTANCE;
-            } else {
-                strategy = BlockingInvocationStrategy.INSTANCE;
-            }
+            final InvocationStrategy strategy = 
InvocationType.forMethod(method);
 
             rc = new MethodData(strategy, serializationStrategy, signature);
             synchronized (method_cache) {

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationType.java
----------------------------------------------------------------------
diff --git 
a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationType.java
 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationType.java
new file mode 100644
index 0000000..d296e03
--- /dev/null
+++ 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationType.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin.tcp;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.Future;
+
+import org.apache.aries.rsa.provider.fastbin.api.AsyncCallback;
+import org.osgi.util.promise.Promise;
+
+public enum InvocationType
+{
+    ASYNC_FUTURE(new AsyncFutureInvocationStrategy()){
+
+        @Override
+        protected boolean applies(Method method) {
+            Class<?> returnType = method.getReturnType();
+            if(returnType != null) {
+                return Future.class.isAssignableFrom(returnType);
+            }
+            return false;
+        }
+
+    }, ASYNC_CALLBACK(new AsyncInvocationStrategy()){
+
+        @Override
+        protected boolean applies(Method method) {
+            Class<?>[] types = method.getParameterTypes();
+            return types.length != 0 && types[types.length - 1] == 
AsyncCallback.class;
+        }
+
+    }, PROMISE(new AsyncPromiseInvocationStrategy()){
+
+        @Override
+        protected boolean applies(Method method) {
+            if(!promiseAvailable)
+                return false;
+            Class<?> returnType = method.getReturnType();
+            if(returnType != null) {
+                return Promise.class.isAssignableFrom(returnType);
+            }
+            return false;
+        }
+
+    }, BLOCKING(new BlockingInvocationStrategy()){
+
+        @Override
+        protected boolean applies(Method method) {
+            return true;
+        }
+    };
+
+    private InvocationStrategy strategy;
+    /**
+     * the dependency to OSGi promise is optional. This flag
+     * tracks if the class is visible or not
+     */
+    private static boolean promiseAvailable;
+
+    private InvocationType(InvocationStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+
+    public static InvocationStrategy forMethod(Method method) {
+        InvocationType[] values = values();
+        for (InvocationType invocationType : values) {
+            if(invocationType.applies(method)) {
+                return invocationType.strategy;
+            }
+        }
+        return null;
+    }
+
+
+    protected abstract boolean applies(Method method);
+
+    static {
+        try{
+            String name = Promise.class.getName();
+            // if we make it here, the class is available
+            promiseAvailable = true;
+        } catch (Throwable t) {
+            promiseAvailable = false;
+        }
+    }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
----------------------------------------------------------------------
diff --git 
a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
index ee29fe4..c365a56 100644
--- 
a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
+++ 
b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java
@@ -120,12 +120,7 @@ public class ServerInvokerImpl implements ServerInvoker, 
Dispatched {
                 }
 
 
-                final InvocationStrategy invocationStrategy;
-                if( AsyncInvocationStrategy.isAsyncMethod(method) ) {
-                    invocationStrategy = AsyncInvocationStrategy.INSTANCE;
-                } else {
-                    invocationStrategy = BlockingInvocationStrategy.INSTANCE;
-                }
+                final InvocationStrategy invocationStrategy = 
InvocationType.forMethod(method);
 
                 rc = new MethodData(invocationStrategy, serializationStrategy, 
method);
                 method_cache.put(data, rc);

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/FutureInvocationTest.java
----------------------------------------------------------------------
diff --git 
a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/FutureInvocationTest.java
 
b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/FutureInvocationTest.java
new file mode 100644
index 0000000..be1b53f
--- /dev/null
+++ 
b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/FutureInvocationTest.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin;
+
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.aries.rsa.provider.fastbin.InvocationTest.HelloImpl;
+import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy;
+import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker;
+import org.apache.aries.rsa.provider.fastbin.tcp.ClientInvokerImpl;
+import org.apache.aries.rsa.provider.fastbin.tcp.ServerInvokerImpl;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class FutureInvocationTest
+{
+
+    private ServerInvokerImpl server;
+    private ClientInvokerImpl client;
+    private TestService testService;
+
+
+    @Before
+    public void setup() throws Exception
+    {
+        DispatchQueue queue = Dispatch.createQueue();
+        HashMap<String, SerializationStrategy> map = new HashMap<String, 
SerializationStrategy>();
+        server = new ServerInvokerImpl("tcp://localhost:0", queue, map);
+        server.start();
+
+        client = new ClientInvokerImpl(queue, map);
+        client.start();
+//        server.stop();
+        server.registerService("service-id", new ServerInvoker.ServiceFactory()
+        {
+            public Object get()
+            {
+                return new TestServiceImpl();
+            }
+
+
+            public void unget()
+            {}
+        }, TestServiceImpl.class.getClassLoader());
+
+        InvocationHandler handler = 
client.getProxy(server.getConnectAddress(), "service-id", 
TestServiceImpl.class.getClassLoader());
+        testService = 
(TestService)Proxy.newProxyInstance(HelloImpl.class.getClassLoader(), new 
Class[]{TestService.class}, handler);
+    }
+
+
+    @After
+    public void tearDown()
+    {
+        server.stop();
+        client.stop();
+    }
+
+
+
+    @Test
+    public void testInvokeCompletableFuture() throws Exception {
+        assertEquals("Hello",testService.helloAsync().get(5, 
TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testInvokeCompletableFutureManyThreads() throws Exception {
+        int threadCount = 20;
+        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+        Callable<String> task = () -> testService.helloAsync().get();
+        List<Callable<String>> tasks = new ArrayList<>();
+        tasks.addAll(Collections.nCopies(threadCount, task));
+        List<Future<String>> results = new ArrayList<>();
+        for (Callable<String> single : tasks) {
+            results.add(executor.submit(single));
+        }
+        assertEquals(threadCount, results.size());
+        for (Future<String> future : results)
+        {
+            assertEquals("Hello",future.get());
+        }
+    }
+
+
+
+    @Test
+    public void testInvokeFuture() throws Exception {
+        assertEquals("Hello",testService.helloAsyncStandardFuture().get(500, 
TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testInvokeFutureManyThreads() throws Exception {
+        int threadCount = 20;
+        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+        Callable<String> task = () -> 
testService.helloAsyncStandardFuture().get();
+        List<Callable<String>> tasks = new ArrayList<>();
+        tasks.addAll(Collections.nCopies(threadCount, task));
+        List<Future<String>> results = new ArrayList<>();
+        for (Callable<String> single : tasks)
+        {
+            results.add(executor.submit(single));
+        }
+        assertEquals(threadCount, results.size());
+        for (Future<String> future : results)
+        {
+            assertEquals("Hello",future.get());
+        }
+    }
+
+    @Test
+    public void testInvokeFutureExceptionally() throws Exception {
+
+        CompletableFuture<String> future = testService.exceptionAsync();
+        try{
+            future.get(5, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof IOException);
+            assertEquals("test", e.getCause().getMessage());
+        }
+    }
+
+
+    public interface TestService
+    {
+        CompletableFuture<String> helloAsync();
+
+        Future<String> helloAsyncStandardFuture();
+
+        CompletableFuture<String> exceptionAsync() throws IOException;
+    }
+
+    public class TestServiceImpl implements TestService {
+
+        @Override
+        public CompletableFuture<String> helloAsync() {
+            return CompletableFuture.supplyAsync(() -> "Hello");
+        }
+
+        @Override
+        public CompletableFuture<String> exceptionAsync() throws IOException {
+             CompletableFuture f = CompletableFuture.supplyAsync(() -> {
+                 sleep(500);
+                 return  "Hello";
+             });
+             f.completeExceptionally(new IOException("test"));
+             return f;
+        }
+
+        private void sleep(long time) {
+            try {
+                Thread.sleep(time);
+            }
+            catch (InterruptedException e) {
+                //NOOP
+            }
+        }
+
+        @Override
+        public Future<String> helloAsyncStandardFuture() {
+            return Executors.newSingleThreadExecutor().submit(() -> {
+                sleep(500);
+                return  "Hello";
+            });
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/PromiseInvocationTest.java
----------------------------------------------------------------------
diff --git 
a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/PromiseInvocationTest.java
 
b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/PromiseInvocationTest.java
new file mode 100644
index 0000000..7835f1f
--- /dev/null
+++ 
b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/PromiseInvocationTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.provider.fastbin;
+
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.aries.rsa.provider.fastbin.InvocationTest.HelloImpl;
+import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy;
+import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker;
+import org.apache.aries.rsa.provider.fastbin.tcp.ClientInvokerImpl;
+import org.apache.aries.rsa.provider.fastbin.tcp.ServerInvokerImpl;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.hawtdispatch.DispatchQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.osgi.util.promise.Deferred;
+import org.osgi.util.promise.Promise;
+
+
+public class PromiseInvocationTest
+{
+
+    private ServerInvokerImpl server;
+    private ClientInvokerImpl client;
+    private TestService testService;
+
+
+    @Before
+    public void setup() throws Exception
+    {
+        DispatchQueue queue = Dispatch.createQueue();
+        HashMap<String, SerializationStrategy> map = new HashMap<String, 
SerializationStrategy>();
+        server = new ServerInvokerImpl("tcp://localhost:0", queue, map);
+        server.start();
+
+        client = new ClientInvokerImpl(queue, map);
+        client.start();
+//        server.stop();
+        server.registerService("service-id", new ServerInvoker.ServiceFactory()
+        {
+            public Object get()
+            {
+                return new TestServiceImpl();
+            }
+
+
+            public void unget()
+            {}
+        }, TestServiceImpl.class.getClassLoader());
+
+        InvocationHandler handler = 
client.getProxy(server.getConnectAddress(), "service-id", 
TestServiceImpl.class.getClassLoader());
+        testService = 
(TestService)Proxy.newProxyInstance(HelloImpl.class.getClassLoader(), new 
Class[]{TestService.class}, handler);
+    }
+
+
+    @After
+    public void tearDown()
+    {
+        server.stop();
+        client.stop();
+    }
+
+
+
+    @Test
+    public void testInvoke() throws Exception {
+        assertEquals("Hello",testService.helloPromise().getValue());
+    }
+
+    @Test
+    public void testInvokeManyThreads() throws Exception {
+        int threadCount = 20;
+        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+        Callable<String> task = () -> testService.helloPromise().getValue();
+        List<Callable<String>> tasks = new ArrayList<>();
+        tasks.addAll(Collections.nCopies(threadCount, task));
+        List<Future<String>> results = new ArrayList<>();
+        for (Callable<String> single : tasks) {
+            results.add(executor.submit(single));
+        }
+        assertEquals(threadCount, results.size());
+        for (Future<String> future : results)
+        {
+            assertEquals("Hello",future.get());
+        }
+    }
+
+    @Test
+    public void testInvokeFutureExceptionally() throws Exception {
+
+        Promise<String> promise = testService.exceptionPromise();
+        try{
+            promise.getValue();
+            fail("Must throw an exception");
+        } catch (InvocationTargetException e) {
+            assertTrue(e.getTargetException() instanceof IOException);
+            assertEquals("test", e.getCause().getMessage());
+            assertTrue(promise.getFailure() instanceof IOException);
+        }
+    }
+
+
+    public interface TestService
+    {
+        Promise<String> helloPromise();
+
+        Promise<String> exceptionPromise() throws IOException;
+    }
+
+    public class TestServiceImpl implements TestService {
+
+        @Override
+        public Promise<String> helloPromise() {
+            final Deferred<String> deferred = new Deferred<String>();
+            new Thread(() -> deferred.resolve("Hello")).start();
+            return deferred.getPromise();
+        }
+
+        @Override
+        public Promise<String> exceptionPromise() throws IOException {
+            final Deferred<String> deferred = new Deferred<String>();
+            new Thread(() -> {
+                sleep(500);
+                deferred.fail(new IOException("test"));
+            }).start();
+             return deferred.getPromise();
+        }
+
+        private void sleep(long time) {
+            try {
+                Thread.sleep(time);
+            }
+            catch (InterruptedException e) {
+                //NOOP
+            }
+        }
+    }
+}

Reply via email to