Repository: aries-rsa Updated Branches: refs/heads/master fe14770a7 -> 42087ca27
[ARIES-1586] Async calls in fastbin remote methods can now return Future/CompletableFuture/Promise for async invocation. Also adds a Readme.md for fastbin closes #13 Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/42087ca2 Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/42087ca2 Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/42087ca2 Branch: refs/heads/master Commit: 42087ca275ad804ee8ff2fda97b6a5b480e3fd06 Parents: fe14770 Author: Johannes Utzig <[email protected]> Authored: Mon Jul 25 12:55:22 2016 +0200 Committer: Christian Schneider <[email protected]> Committed: Thu Aug 4 14:57:05 2016 +0200 ---------------------------------------------------------------------- Readme.md | 22 +- provider/fastbin/Readme.md | 23 ++ provider/fastbin/pom.xml | 15 ++ .../fastbin/tcp/AbstractInvocationStrategy.java | 132 +++++++++++ .../tcp/AsyncFutureInvocationStrategy.java | 232 +++++++++++++++++++ .../fastbin/tcp/AsyncInvocationStrategy.java | 86 ++----- .../tcp/AsyncPromiseInvocationStrategy.java | 139 +++++++++++ .../fastbin/tcp/BlockingInvocationStrategy.java | 18 +- .../provider/fastbin/tcp/ClientInvokerImpl.java | 7 +- .../provider/fastbin/tcp/InvocationType.java | 106 +++++++++ .../provider/fastbin/tcp/ServerInvokerImpl.java | 7 +- .../provider/fastbin/FutureInvocationTest.java | 201 ++++++++++++++++ .../provider/fastbin/PromiseInvocationTest.java | 169 ++++++++++++++ 13 files changed, 1067 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/Readme.md ---------------------------------------------------------------------- diff --git a/Readme.md b/Readme.md index 3b282e5..cc76e26 100644 --- a/Readme.md +++ b/Readme.md @@ -1,8 +1,24 @@ # Aries Remote Service Admin (RSA) The [Aries Remote Service Admin (RSA)](http://aries.apache.org/modules/rsa.html) project allows to transparently use OSGi -services for remote communication. OSGi services can be marked for export by adding a service property +services for remote communication. OSGi services can be marked for export by adding a service property service.exported.interfaces=*. Various other properties can be used to customize how the service is to be exposed. -For more information, check out section "122 JPA Service Specification Version 1.0" in the "OSGi Service Platform - Enterprise Specification, Release 4, Version 4.2" available for public download from the OSGi Alliance. +For more information, check out section "13 Remote Services Version 1.0" in the "OSGi Service Platform + Service Compendium, Release 4, Version 4.2" available for public download from the OSGi Alliance. + +## Distribution Provider + +Aries Remote Service Admin provides two different transport layers out of the box and can be extended with custom transports. +Please refer to their individual Readme.me on how to use them. + + * `aries.tcp` - A very lightweight TCP based transport that is ideal to get a first demo running and to serve as template for custom distribution providers + * `aries.fastbin` - A fast binary transport that uses multiplexing on a pool of java nio channels. Fastbin supports both sync and long running async calls (via Future/Promise) + +## Discovery Provider + +The discovery providers are responsible for finding the available endpoint descriptions of remote services. Aries RSA provides three different implementations and can be extended with custom discovery providers. The three available implementations are + + * zookeeper - Manages endpoint descriptions as zookeeper nodes. + * local - Scans bundles for endpoint descriptions + * config - Reads endpoint descriptions from ConfigAdmin service \ No newline at end of file http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/Readme.md ---------------------------------------------------------------------- diff --git a/provider/fastbin/Readme.md b/provider/fastbin/Readme.md new file mode 100644 index 0000000..2997a1b --- /dev/null +++ b/provider/fastbin/Readme.md @@ -0,0 +1,23 @@ +# Fastbin transport provider + +Allows transparent remoting using Java Serialization over TCP. The fastbin provider uses a pool of nio tcp channels to transport data. +It can use either java object serialization or protobuf to serialize parameters and return values. +Sync remote calls have a default timeout of 5 minutes. For long running operations async calls should be used. This is indicated by having either + + * `Future` + * `CompletableFuture` + * `Promise` + +as the return value of the remote method. The client will receive a proxy of that type that will be resolved async as soon as the server finished computation. + + +## Endpoint Configuration + +service.exported.configs: aries.fastbin + +| Key | Default | Description | +| -------------------------| --------------------- | -------------------------------------------------------- | +| uri | tcp://0.0.0.0:2543 | The bind address to use | +| exportAddress | looks up the hostname | The ip/hostname how remote clients can reach this server | +| timeout | 300000 | The timeout for sync calls (default 5 minutes) | + http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/pom.xml ---------------------------------------------------------------------- diff --git a/provider/fastbin/pom.xml b/provider/fastbin/pom.xml index 59e5417..5e1a5b9 100644 --- a/provider/fastbin/pom.xml +++ b/provider/fastbin/pom.xml @@ -37,11 +37,26 @@ <artifactId>hawtbuf-proto</artifactId> <optional>true</optional> </dependency> + <!-- this is only needed you you want to use the Promise for remote services --> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.util.promise</artifactId> + <version>1.0.0</version> + <optional>true</optional> + </dependency> </dependencies> <build> <plugins> <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> <groupId>org.fusesource.hawtbuf</groupId> <artifactId>hawtbuf-protoc</artifactId> <version>${hawtbuf.version}</version> http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java new file mode 100644 index 0000000..75f06a9 --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.fastbin.tcp; + +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy; +import org.fusesource.hawtbuf.DataByteArrayInputStream; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; +import org.osgi.framework.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractInvocationStrategy implements InvocationStrategy +{ + + protected final static Logger LOGGER = LoggerFactory.getLogger(AbstractInvocationStrategy.class); + + @Override + public ResponseFuture request(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream requestStream) throws Exception { + encodeRequest(serializationStrategy, loader, method, args, requestStream); + return createResponse(serializationStrategy, loader,method, args); + } + + /** + * encodes the request to the stream + * @param serializationStrategy + * @param loader + * @param method + * @param args + * @param requestStream + * @param protocolVersion + * @throws Exception + */ + protected void encodeRequest(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream requestStream) throws Exception { + serializationStrategy.encodeRequest(loader, method.getParameterTypes(), args, requestStream); + } + + /** + * creates a response for the remote method call + * @param serializationStrategy + * @param loader + * @param method + * @param args + * @return + * @throws Exception + */ + protected abstract ResponseFuture createResponse(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args) throws Exception; + + + @Override + public final void service(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, DataByteArrayOutputStream responseStream, Runnable onComplete) { + doService(serializationStrategy, loader, method, target, requestStream, responseStream, onComplete); + + } + + /** + * performs the actual remote call using the provided parameters + * @param serializationStrategy the strategy to serialize the objects with + * @param loader the classloader to use + * @param method the method to call + * @param target the object to call the method on + * @param requestStream + * @param responseStream + * @param onComplete to be executed after the call has finished + */ + protected abstract void doService(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, DataByteArrayOutputStream responseStream, Runnable onComplete); + + protected Class getResultType(Method method) { + return method.getReturnType(); + } + + protected class AsyncServiceResponse { + + private final ClassLoader loader; + private final Method method; + private final DataByteArrayOutputStream responseStream; + private final Runnable onComplete; + private final SerializationStrategy serializationStrategy; + private final int pos; + // Used to protect against sending multiple responses. + final AtomicBoolean responded = new AtomicBoolean(false); + + public AsyncServiceResponse(ClassLoader loader, Method method, DataByteArrayOutputStream responseStream, Runnable onComplete, SerializationStrategy serializationStrategy) { + this.loader = loader; + this.method = method; + this.responseStream = responseStream; + this.onComplete = onComplete; + this.serializationStrategy = serializationStrategy; + pos = responseStream.position(); + } + + public void send(Throwable error, Object value) { + if( responded.compareAndSet(false, true) ) { + Class resultType = getResultType(method); + try { + serializationStrategy.encodeResponse(loader, resultType, value, error, responseStream); + } catch (Exception e) { + // we failed to encode the response.. reposition and write that error. + try { + responseStream.position(pos); + serializationStrategy.encodeResponse(loader, resultType, value, new ServiceException(e.toString()), responseStream); + } catch (Exception unexpected) { + LOGGER.error("Error while servicing "+method,unexpected); + } + } finally { + onComplete.run(); + } + } + } + } +} + + + http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncFutureInvocationStrategy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncFutureInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncFutureInvocationStrategy.java new file mode 100644 index 0000000..a46faf4 --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncFutureInvocationStrategy.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.fastbin.tcp; + +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +import org.apache.aries.rsa.provider.fastbin.api.AsyncCallback; +import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy; +import org.fusesource.hawtbuf.DataByteArrayInputStream; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AsyncFutureInvocationStrategy extends AbstractInvocationStrategy { + + private FutureCompleter completer = new FutureCompleter(); + + @SuppressWarnings("unchecked") + protected void doService(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, final DataByteArrayOutputStream responseStream, final Runnable onComplete) { + + final AsyncServiceResponse helper = new AsyncServiceResponse(loader, method, responseStream, onComplete, serializationStrategy); + try { + Class<?>[] types = method.getParameterTypes(); + final Object[] args = new Object[types.length]; + serializationStrategy.decodeRequest(loader, types, requestStream, args); + Future<Object> future = (Future<Object>)method.invoke(target, args); + CompletableFuture<Object> completable = null; + if(future instanceof CompletableFuture) { + completable = (CompletableFuture<Object>)future; + } + else { + completable = completer.complete(future); + } + completable.whenComplete(new BiConsumer<Object, Throwable>() { + public void accept(Object returnValue, Throwable exception) { + helper.send(exception, returnValue); + }; + }); + + } catch (Throwable t) { + helper.send(t, null); + } + } + + + @Override + protected ResponseFuture createResponse(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args) throws Exception { + return new AsyncResponseFuture(loader, method, serializationStrategy, Dispatch.getCurrentQueue()); + } + + protected Class getResultType(Method method) { + try { + Type type = method.getGenericReturnType(); + ParameterizedType t = (ParameterizedType) type; + return (Class) t.getActualTypeArguments()[0]; + } + catch (Exception e) { + return super.getResultType(method); + } + } + + @SuppressWarnings({"rawtypes"}) + private class AsyncResponseFuture implements ResponseFuture, AsyncCallback { + + private final ClassLoader loader; + private final Method method; + private final SerializationStrategy serializationStrategy; + private final DispatchQueue queue; + private CompletableFuture<Object> future; + + public AsyncResponseFuture(ClassLoader loader, Method method, SerializationStrategy serializationStrategy, DispatchQueue queue) { + this.loader = loader; + this.method = method; + this.serializationStrategy = serializationStrategy; + this.queue = queue; + this.future = new CompletableFuture<>(); + } + + public void set(final DataByteArrayInputStream source) { + if( queue != null ) { + queue.execute(new Runnable() { + public void run() { + decodeIt(source); + } + }); + } else { + decodeIt(source); + } + } + + private void decodeIt(DataByteArrayInputStream source) { + try { + serializationStrategy.decodeResponse(loader, getResultType(method), source, this); + } catch (Throwable e) { + onFailure(e); + } + } + + @Override + public void fail(Throwable throwable) { + + onFailure(throwable); + } + + @Override + public void onSuccess(Object result) { + future.complete(result); + } + + @Override + public void onFailure(Throwable failure) { + future.completeExceptionally(failure); + } + + @Override + public Object get(long timeout, TimeUnit unit) throws Exception + { + return future; + } + } + + /** + * Helper class that polls available futures in a background thread for readiness + * and reports them to a completable future + * + */ + private static class FutureCompleter extends Thread { + + private ConcurrentMap<Future<Object>, CompletableFuture<Object>> futures; + private Semaphore counter; + private AtomicBoolean started; + + public FutureCompleter() { + setName("Fastbin-Future-Completer"); + setDaemon(true); + futures = new ConcurrentHashMap<>(); + counter = new Semaphore(0); + started = new AtomicBoolean(false); + } + + @Override + public void run() { + while(true) { + // all currently available entries will be processed + int takenPermits = Math.max(1, counter.availablePermits()); + try { + counter.acquire(takenPermits); + } + catch (InterruptedException e) { + continue; + } + Set<Entry<Future<Object>, CompletableFuture<Object >>> entrySet = futures.entrySet(); + int processed = 0; + for (Entry<Future<Object>, CompletableFuture<Object>> entry : entrySet) { + if(processed == takenPermits) { + //we only release as many as we took permits. The remainder will be handled in the next iteration + break; + } + Future< ? > future = entry.getKey(); + if(future.isDone()) { + try { + Object object = future.get(); + entry.getValue().complete(object); + } + catch (ExecutionException e) { + entry.getValue().completeExceptionally(e.getCause()); + } + catch (Exception e) { + entry.getValue().completeExceptionally(e); + } + futures.remove(future); + processed++; + } + else { + // if the future is complete, the permit is not released + counter.release(); + } + try { + Thread.sleep(20); + } + catch (InterruptedException e) { + // sleep a little to wait for additional futures to complete + } + } + } + } + + public CompletableFuture<Object> complete(Future<Object> future) { + if(started.compareAndSet(false, true)) { + start(); + } + CompletableFuture<Object> completable = new CompletableFuture<>(); + futures.put(future, completable); + counter.release(); + return completable; + } + } +} + + + http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java index 77167b0..47ab6b1 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncInvocationStrategy.java @@ -33,21 +33,17 @@ import org.fusesource.hawtbuf.DataByteArrayInputStream; import org.fusesource.hawtbuf.DataByteArrayOutputStream; import org.fusesource.hawtdispatch.Dispatch; import org.fusesource.hawtdispatch.DispatchQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * <p> * </p> * */ -public class AsyncInvocationStrategy implements InvocationStrategy { - - public static final AsyncInvocationStrategy INSTANCE = new AsyncInvocationStrategy(); - - static public boolean isAsyncMethod(Method method) { - Class<?>[] types = method.getParameterTypes(); - return types.length != 0 && types[types.length - 1] == AsyncCallback.class; - } +public class AsyncInvocationStrategy extends AbstractInvocationStrategy { + protected static final Logger LOGGER = LoggerFactory.getLogger(AsyncInvocationStrategy.class); private class AsyncResponseFuture implements ResponseFuture { @@ -96,20 +92,25 @@ public class AsyncInvocationStrategy implements InvocationStrategy { } } - public ResponseFuture request(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream target) throws Exception { - if(!isAsyncMethod(method)) { - throw new IllegalArgumentException("Invalid async method declaration: last argument is not a RequestCallback"); - } - - Class[] new_types = payloadTypes(method); + @Override + protected void encodeRequest(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream requestStream) throws Exception { + Class<?>[] new_types = payloadTypes(method); Object[] new_args = new Object[args.length-1]; System.arraycopy(args, 0, new_args, 0, new_args.length); + serializationStrategy.encodeRequest(loader, new_types, new_args, requestStream); + } - serializationStrategy.encodeRequest(loader, new_types, new_args, target); - + @Override + protected ResponseFuture createResponse(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args) throws Exception { return new AsyncResponseFuture(loader, method, (AsyncCallback) args[args.length-1], serializationStrategy, Dispatch.getCurrentQueue()); } + protected Class getResultType(Method method) { + Type[] types = method.getGenericParameterTypes(); + ParameterizedType t = (ParameterizedType) types[types.length-1]; + return (Class) t.getActualTypeArguments()[0]; + } + static private Class<?>[] payloadTypes(Method method) { Class<?>[] types = method.getParameterTypes(); Class<?>[] new_types = new Class<?>[types.length-1]; @@ -117,57 +118,9 @@ public class AsyncInvocationStrategy implements InvocationStrategy { return new_types; } - static private Class getResultType(Method method) { - Type[] types = method.getGenericParameterTypes(); - ParameterizedType t = (ParameterizedType) types[types.length-1]; - return (Class) t.getActualTypeArguments()[0]; - } - - - class ServiceResponse { + protected void doService(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, final DataByteArrayOutputStream responseStream, final Runnable onComplete) { - private final ClassLoader loader; - private final Method method; - private final DataByteArrayOutputStream responseStream; - private final Runnable onComplete; - private final SerializationStrategy serializationStrategy; - private final int pos; - // Used to protect against sending multiple responses. - final AtomicBoolean responded = new AtomicBoolean(false); - - public ServiceResponse(ClassLoader loader, Method method, DataByteArrayOutputStream responseStream, Runnable onComplete, SerializationStrategy serializationStrategy) { - this.loader = loader; - this.method = method; - this.responseStream = responseStream; - this.onComplete = onComplete; - this.serializationStrategy = serializationStrategy; - pos = responseStream.position(); - } - - public void send(Throwable error, Object value) { - if( responded.compareAndSet(false, true) ) { - Class resultType = getResultType(method); - try { - serializationStrategy.encodeResponse(loader, resultType, value, error, responseStream); - } catch (Exception e) { - // we failed to encode the response.. reposition and write that error. - try { - responseStream.position(pos); - serializationStrategy.encodeResponse(loader, resultType, value, new RemoteException(e.toString()), responseStream); - } catch (Exception unexpected) { - unexpected.printStackTrace(); - } - } finally { - onComplete.run(); - } - } - } - - - } - public void service(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, final DataByteArrayOutputStream responseStream, final Runnable onComplete) { - - final ServiceResponse helper = new ServiceResponse(loader, method, responseStream, onComplete, serializationStrategy); + final AsyncServiceResponse helper = new AsyncServiceResponse(loader, method, responseStream, onComplete, serializationStrategy); try { Object[] new_args = new Object[method.getParameterTypes().length]; @@ -188,4 +141,5 @@ public class AsyncInvocationStrategy implements InvocationStrategy { } + } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncPromiseInvocationStrategy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncPromiseInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncPromiseInvocationStrategy.java new file mode 100644 index 0000000..ec2cd26 --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AsyncPromiseInvocationStrategy.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.fastbin.tcp; + +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.concurrent.TimeUnit; + +import org.apache.aries.rsa.provider.fastbin.api.AsyncCallback; +import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy; +import org.fusesource.hawtbuf.DataByteArrayInputStream; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; +import org.osgi.util.promise.Deferred; +import org.osgi.util.promise.Promise; + +public class AsyncPromiseInvocationStrategy extends AbstractInvocationStrategy { + + @SuppressWarnings("unchecked") + protected void doService(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, final DataByteArrayOutputStream responseStream, final Runnable onComplete) { + + final AsyncServiceResponse helper = new AsyncServiceResponse(loader, method, responseStream, onComplete, serializationStrategy); + try { + Class<?>[] types = method.getParameterTypes(); + final Object[] args = new Object[types.length]; + serializationStrategy.decodeRequest(loader, types, requestStream, args); + final Promise<Object> promise = (Promise<Object>)method.invoke(target, args); + promise.onResolve(() -> { + try{ + helper.send(promise.getFailure(), promise.getFailure()==null ? promise.getValue() : null); + } + catch (Exception e){ + helper.send(e, null); + } + }); + + } catch (Throwable t) { + helper.send(t, null); + } + } + + + @Override + protected ResponseFuture createResponse(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args) throws Exception { + return new AsyncResponseFuture(loader, method, serializationStrategy, Dispatch.getCurrentQueue()); + } + + protected Class getResultType(Method method) { + try { + Type type = method.getGenericReturnType(); + ParameterizedType t = (ParameterizedType) type; + return (Class) t.getActualTypeArguments()[0]; + } + catch (Exception e) { + return super.getResultType(method); + } + } + + @SuppressWarnings({"rawtypes"}) + private class AsyncResponseFuture implements ResponseFuture, AsyncCallback { + + private final ClassLoader loader; + private final Method method; + private final SerializationStrategy serializationStrategy; + private final DispatchQueue queue; + private Deferred<Object> deferred; + + public AsyncResponseFuture(ClassLoader loader, Method method, SerializationStrategy serializationStrategy, DispatchQueue queue) { + this.loader = loader; + this.method = method; + this.serializationStrategy = serializationStrategy; + this.queue = queue; + this.deferred = new Deferred<>(); + } + + public void set(final DataByteArrayInputStream source) { + if( queue != null ) { + queue.execute(new Runnable() { + public void run() { + decodeIt(source); + } + }); + } else { + decodeIt(source); + } + } + + private void decodeIt(DataByteArrayInputStream source) { + try { + serializationStrategy.decodeResponse(loader, getResultType(method), source, this); + } catch (Throwable e) { + onFailure(e); + } + } + + @Override + public void fail(Throwable throwable) { + + onFailure(throwable); + } + + @Override + public void onSuccess(Object result) { + deferred.resolve(result); + } + + @Override + public void onFailure(Throwable failure) { + deferred.fail(failure); + } + + @Override + public Object get(long timeout, TimeUnit unit) throws Exception + { + return deferred.getPromise(); + } + } +} + + + http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java index bbda2ee..5190157 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/BlockingInvocationStrategy.java @@ -29,16 +29,17 @@ import org.apache.aries.rsa.provider.fastbin.api.AsyncCallback; import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy; import org.fusesource.hawtbuf.DataByteArrayInputStream; import org.fusesource.hawtbuf.DataByteArrayOutputStream; -import org.fusesource.hawtdispatch.Dispatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * <p> * </p> * */ -public class BlockingInvocationStrategy implements InvocationStrategy { +public class BlockingInvocationStrategy extends AbstractInvocationStrategy { - public static final BlockingInvocationStrategy INSTANCE = new BlockingInvocationStrategy(); + protected static final Logger LOGGER = LoggerFactory.getLogger(BlockingInvocationStrategy.class); private static final Callable<Object> EMPTY_CALLABLE = new Callable<Object>() { public Object call() { @@ -80,15 +81,13 @@ public class BlockingInvocationStrategy implements InvocationStrategy { } } - public ResponseFuture request(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream target) throws Exception { - assert Dispatch.getCurrentQueue() == null : "You should not do blocking RPC class when executing on a dispatch queue"; - - serializationStrategy.encodeRequest(loader, method.getParameterTypes(), args, target); + @Override + protected ResponseFuture createResponse(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args) throws Exception { return new BlockingResponseFuture(loader, method, serializationStrategy); } - public void service(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, DataByteArrayOutputStream responseStream, Runnable onComplete) { + public void doService(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, DataByteArrayOutputStream responseStream, Runnable onComplete) { int pos = responseStream.position(); try { @@ -113,12 +112,13 @@ public class BlockingInvocationStrategy implements InvocationStrategy { } catch(Exception e) { + LOGGER.warn("Initial Encoding response for method "+method+" failed. Retrying",e); // we failed to encode the response.. reposition and write that error. try { responseStream.position(pos); serializationStrategy.encodeResponse(loader, method.getReturnType(), null, new RemoteException(e.toString()), responseStream); } catch (Exception unexpected) { - unexpected.printStackTrace(); + LOGGER.error("Error while servicing "+method,unexpected); } } finally { http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java index 877b297..ba0a12b 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ClientInvokerImpl.java @@ -199,12 +199,7 @@ public class ClientInvokerImpl implements ClientInvoker, Dispatched { serializationStrategy = ObjectSerializationStrategy.INSTANCE; } - final InvocationStrategy strategy; - if( AsyncInvocationStrategy.isAsyncMethod(method) ) { - strategy = AsyncInvocationStrategy.INSTANCE; - } else { - strategy = BlockingInvocationStrategy.INSTANCE; - } + final InvocationStrategy strategy = InvocationType.forMethod(method); rc = new MethodData(strategy, serializationStrategy, signature); synchronized (method_cache) { http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationType.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationType.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationType.java new file mode 100644 index 0000000..d296e03 --- /dev/null +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationType.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.fastbin.tcp; + +import java.lang.reflect.Method; +import java.util.concurrent.Future; + +import org.apache.aries.rsa.provider.fastbin.api.AsyncCallback; +import org.osgi.util.promise.Promise; + +public enum InvocationType +{ + ASYNC_FUTURE(new AsyncFutureInvocationStrategy()){ + + @Override + protected boolean applies(Method method) { + Class<?> returnType = method.getReturnType(); + if(returnType != null) { + return Future.class.isAssignableFrom(returnType); + } + return false; + } + + }, ASYNC_CALLBACK(new AsyncInvocationStrategy()){ + + @Override + protected boolean applies(Method method) { + Class<?>[] types = method.getParameterTypes(); + return types.length != 0 && types[types.length - 1] == AsyncCallback.class; + } + + }, PROMISE(new AsyncPromiseInvocationStrategy()){ + + @Override + protected boolean applies(Method method) { + if(!promiseAvailable) + return false; + Class<?> returnType = method.getReturnType(); + if(returnType != null) { + return Promise.class.isAssignableFrom(returnType); + } + return false; + } + + }, BLOCKING(new BlockingInvocationStrategy()){ + + @Override + protected boolean applies(Method method) { + return true; + } + }; + + private InvocationStrategy strategy; + /** + * the dependency to OSGi promise is optional. This flag + * tracks if the class is visible or not + */ + private static boolean promiseAvailable; + + private InvocationType(InvocationStrategy strategy) { + this.strategy = strategy; + } + + + public static InvocationStrategy forMethod(Method method) { + InvocationType[] values = values(); + for (InvocationType invocationType : values) { + if(invocationType.applies(method)) { + return invocationType.strategy; + } + } + return null; + } + + + protected abstract boolean applies(Method method); + + static { + try{ + String name = Promise.class.getName(); + // if we make it here, the class is available + promiseAvailable = true; + } catch (Throwable t) { + promiseAvailable = false; + } + } +} + + + http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java index ee29fe4..c365a56 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java @@ -120,12 +120,7 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched { } - final InvocationStrategy invocationStrategy; - if( AsyncInvocationStrategy.isAsyncMethod(method) ) { - invocationStrategy = AsyncInvocationStrategy.INSTANCE; - } else { - invocationStrategy = BlockingInvocationStrategy.INSTANCE; - } + final InvocationStrategy invocationStrategy = InvocationType.forMethod(method); rc = new MethodData(invocationStrategy, serializationStrategy, method); method_cache.put(data, rc); http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/FutureInvocationTest.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/FutureInvocationTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/FutureInvocationTest.java new file mode 100644 index 0000000..be1b53f --- /dev/null +++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/FutureInvocationTest.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.fastbin; + + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.aries.rsa.provider.fastbin.InvocationTest.HelloImpl; +import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy; +import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker; +import org.apache.aries.rsa.provider.fastbin.tcp.ClientInvokerImpl; +import org.apache.aries.rsa.provider.fastbin.tcp.ServerInvokerImpl; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + + +public class FutureInvocationTest +{ + + private ServerInvokerImpl server; + private ClientInvokerImpl client; + private TestService testService; + + + @Before + public void setup() throws Exception + { + DispatchQueue queue = Dispatch.createQueue(); + HashMap<String, SerializationStrategy> map = new HashMap<String, SerializationStrategy>(); + server = new ServerInvokerImpl("tcp://localhost:0", queue, map); + server.start(); + + client = new ClientInvokerImpl(queue, map); + client.start(); +// server.stop(); + server.registerService("service-id", new ServerInvoker.ServiceFactory() + { + public Object get() + { + return new TestServiceImpl(); + } + + + public void unget() + {} + }, TestServiceImpl.class.getClassLoader()); + + InvocationHandler handler = client.getProxy(server.getConnectAddress(), "service-id", TestServiceImpl.class.getClassLoader()); + testService = (TestService)Proxy.newProxyInstance(HelloImpl.class.getClassLoader(), new Class[]{TestService.class}, handler); + } + + + @After + public void tearDown() + { + server.stop(); + client.stop(); + } + + + + @Test + public void testInvokeCompletableFuture() throws Exception { + assertEquals("Hello",testService.helloAsync().get(5, TimeUnit.SECONDS)); + } + + @Test + public void testInvokeCompletableFutureManyThreads() throws Exception { + int threadCount = 20; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + Callable<String> task = () -> testService.helloAsync().get(); + List<Callable<String>> tasks = new ArrayList<>(); + tasks.addAll(Collections.nCopies(threadCount, task)); + List<Future<String>> results = new ArrayList<>(); + for (Callable<String> single : tasks) { + results.add(executor.submit(single)); + } + assertEquals(threadCount, results.size()); + for (Future<String> future : results) + { + assertEquals("Hello",future.get()); + } + } + + + + @Test + public void testInvokeFuture() throws Exception { + assertEquals("Hello",testService.helloAsyncStandardFuture().get(500, TimeUnit.SECONDS)); + } + + @Test + public void testInvokeFutureManyThreads() throws Exception { + int threadCount = 20; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + Callable<String> task = () -> testService.helloAsyncStandardFuture().get(); + List<Callable<String>> tasks = new ArrayList<>(); + tasks.addAll(Collections.nCopies(threadCount, task)); + List<Future<String>> results = new ArrayList<>(); + for (Callable<String> single : tasks) + { + results.add(executor.submit(single)); + } + assertEquals(threadCount, results.size()); + for (Future<String> future : results) + { + assertEquals("Hello",future.get()); + } + } + + @Test + public void testInvokeFutureExceptionally() throws Exception { + + CompletableFuture<String> future = testService.exceptionAsync(); + try{ + future.get(5, TimeUnit.SECONDS); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IOException); + assertEquals("test", e.getCause().getMessage()); + } + } + + + public interface TestService + { + CompletableFuture<String> helloAsync(); + + Future<String> helloAsyncStandardFuture(); + + CompletableFuture<String> exceptionAsync() throws IOException; + } + + public class TestServiceImpl implements TestService { + + @Override + public CompletableFuture<String> helloAsync() { + return CompletableFuture.supplyAsync(() -> "Hello"); + } + + @Override + public CompletableFuture<String> exceptionAsync() throws IOException { + CompletableFuture f = CompletableFuture.supplyAsync(() -> { + sleep(500); + return "Hello"; + }); + f.completeExceptionally(new IOException("test")); + return f; + } + + private void sleep(long time) { + try { + Thread.sleep(time); + } + catch (InterruptedException e) { + //NOOP + } + } + + @Override + public Future<String> helloAsyncStandardFuture() { + return Executors.newSingleThreadExecutor().submit(() -> { + sleep(500); + return "Hello"; + }); + } + } +} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/42087ca2/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/PromiseInvocationTest.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/PromiseInvocationTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/PromiseInvocationTest.java new file mode 100644 index 0000000..7835f1f --- /dev/null +++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/PromiseInvocationTest.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.provider.fastbin; + + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Proxy; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.aries.rsa.provider.fastbin.InvocationTest.HelloImpl; +import org.apache.aries.rsa.provider.fastbin.api.SerializationStrategy; +import org.apache.aries.rsa.provider.fastbin.io.ServerInvoker; +import org.apache.aries.rsa.provider.fastbin.tcp.ClientInvokerImpl; +import org.apache.aries.rsa.provider.fastbin.tcp.ServerInvokerImpl; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.hawtdispatch.DispatchQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.osgi.util.promise.Deferred; +import org.osgi.util.promise.Promise; + + +public class PromiseInvocationTest +{ + + private ServerInvokerImpl server; + private ClientInvokerImpl client; + private TestService testService; + + + @Before + public void setup() throws Exception + { + DispatchQueue queue = Dispatch.createQueue(); + HashMap<String, SerializationStrategy> map = new HashMap<String, SerializationStrategy>(); + server = new ServerInvokerImpl("tcp://localhost:0", queue, map); + server.start(); + + client = new ClientInvokerImpl(queue, map); + client.start(); +// server.stop(); + server.registerService("service-id", new ServerInvoker.ServiceFactory() + { + public Object get() + { + return new TestServiceImpl(); + } + + + public void unget() + {} + }, TestServiceImpl.class.getClassLoader()); + + InvocationHandler handler = client.getProxy(server.getConnectAddress(), "service-id", TestServiceImpl.class.getClassLoader()); + testService = (TestService)Proxy.newProxyInstance(HelloImpl.class.getClassLoader(), new Class[]{TestService.class}, handler); + } + + + @After + public void tearDown() + { + server.stop(); + client.stop(); + } + + + + @Test + public void testInvoke() throws Exception { + assertEquals("Hello",testService.helloPromise().getValue()); + } + + @Test + public void testInvokeManyThreads() throws Exception { + int threadCount = 20; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + Callable<String> task = () -> testService.helloPromise().getValue(); + List<Callable<String>> tasks = new ArrayList<>(); + tasks.addAll(Collections.nCopies(threadCount, task)); + List<Future<String>> results = new ArrayList<>(); + for (Callable<String> single : tasks) { + results.add(executor.submit(single)); + } + assertEquals(threadCount, results.size()); + for (Future<String> future : results) + { + assertEquals("Hello",future.get()); + } + } + + @Test + public void testInvokeFutureExceptionally() throws Exception { + + Promise<String> promise = testService.exceptionPromise(); + try{ + promise.getValue(); + fail("Must throw an exception"); + } catch (InvocationTargetException e) { + assertTrue(e.getTargetException() instanceof IOException); + assertEquals("test", e.getCause().getMessage()); + assertTrue(promise.getFailure() instanceof IOException); + } + } + + + public interface TestService + { + Promise<String> helloPromise(); + + Promise<String> exceptionPromise() throws IOException; + } + + public class TestServiceImpl implements TestService { + + @Override + public Promise<String> helloPromise() { + final Deferred<String> deferred = new Deferred<String>(); + new Thread(() -> deferred.resolve("Hello")).start(); + return deferred.getPromise(); + } + + @Override + public Promise<String> exceptionPromise() throws IOException { + final Deferred<String> deferred = new Deferred<String>(); + new Thread(() -> { + sleep(500); + deferred.fail(new IOException("test")); + }).start(); + return deferred.getPromise(); + } + + private void sleep(long time) { + try { + Thread.sleep(time); + } + catch (InterruptedException e) { + //NOOP + } + } + } +}
