Repository: aries-rsa Updated Branches: refs/heads/master 2792d5d64 -> b87b66362
[ARIES-1632] fastbin does not throw an error for unknown services when no service holder is available for a given ID, it falls back to object serialization strategy and sends back a service exception Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/b87b6636 Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/b87b6636 Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/b87b6636 Branch: refs/heads/master Commit: b87b6636216a353419732f19e0ad146e751098df Parents: 2792d5d Author: Johannes Utzig <[email protected]> Authored: Thu Nov 17 11:08:30 2016 +0100 Committer: Johannes Utzig <[email protected]> Committed: Thu Nov 17 11:32:24 2016 +0100 ---------------------------------------------------------------------- .../fastbin/tcp/AbstractInvocationStrategy.java | 29 +++++ .../fastbin/tcp/InvocationStrategy.java | 13 +++ .../provider/fastbin/tcp/ServerInvokerImpl.java | 105 +++++++++++++------ .../rsa/provider/fastbin/InvocationTest.java | 77 ++++++++++++++ 4 files changed, 193 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/b87b6636/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java index 2e9937a..c2ab04f 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/AbstractInvocationStrategy.java @@ -103,10 +103,39 @@ public abstract class AbstractInvocationStrategy implements InvocationStrategy @Override public final void service(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, DataByteArrayOutputStream responseStream, Runnable onComplete) { + if(method==null && target instanceof ServiceException) { + handleInvalidRequest(serializationStrategy, loader, method, target, responseStream, onComplete); + return; + } doService(serializationStrategy, loader, method, target, requestStream, responseStream, onComplete); } + protected void handleInvalidRequest(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayOutputStream responseStream, Runnable onComplete) { + //client made an invalid request + int pos = responseStream.position(); + try { + + Object value = null; + Throwable error = (Throwable)target; + serializationStrategy.encodeResponse(loader, null, value, error, responseStream); + + } catch(Exception e) { + + LOGGER.warn("Initial Encoding response for method "+method+" failed. Retrying",e); + // we failed to encode the response.. reposition and write that error. + try { + responseStream.position(pos); + serializationStrategy.encodeResponse(loader, null, null, new ServiceException(e.toString()), responseStream); + } catch (Exception unexpected) { + LOGGER.error("Error while servicing "+method,unexpected); + } + + } finally { + onComplete.run(); + } + } + /** * performs the actual remote call using the provided parameters * @param serializationStrategy the strategy to serialize the objects with http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/b87b6636/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationStrategy.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationStrategy.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationStrategy.java index d05ad88..024bc53 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationStrategy.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/InvocationStrategy.java @@ -33,5 +33,18 @@ public interface InvocationStrategy { public ResponseFuture request(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object[] args, DataByteArrayOutputStream requestStream) throws Exception; + /** + * handles the actual remote call. + * <p> + * if method is <code>null</code> and target is a <code>ServiceException</code> it is treated as an indication that the method lookup failed. + * In such a case the strategy will send the service exception to the caller + * @param serializationStrategy + * @param loader + * @param method + * @param target + * @param requestStream + * @param responseStream + * @param onComplete + */ void service(SerializationStrategy serializationStrategy, ClassLoader loader, Method method, Object target, DataByteArrayInputStream requestStream, DataByteArrayOutputStream responseStream, Runnable onComplete); } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/b87b6636/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java index d56d64c..1dd58f9 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java @@ -46,6 +46,7 @@ import org.fusesource.hawtbuf.DataByteArrayInputStream; import org.fusesource.hawtbuf.DataByteArrayOutputStream; import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.hawtdispatch.DispatchQueue; +import org.osgi.framework.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -243,41 +244,83 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched { final Buffer encoded_method = readBuffer(bais); final ServiceFactoryHolder holder = holders.get(service); - final MethodData methodData = holder.getMethodData(encoded_method); - - final Object svc = holder.factory.get(); + Runnable task = null; + if(holder==null) { + LOGGER.warn("The requested service {"+service+"} is not available"); + task = new Runnable() { + public void run() { + + final DataByteArrayOutputStream baos = new DataByteArrayOutputStream(); + try { + baos.writeInt(0); // make space for the size field. + baos.writeVarLong(correlation); + } catch (IOException e) { // should not happen + LOGGER.error("Failed to write to buffer",e); + throw new RuntimeException(e); + } - Runnable task = new Runnable() { - public void run() { + // Lets decode the remaining args on the target's executor + // to take cpu load off the + BlockingInvocationStrategy strategy = new BlockingInvocationStrategy(); + strategy.service(ObjectSerializationStrategy.INSTANCE, getClass().getClassLoader(), null, new ServiceException("The requested service {"+service+"} is not available"), bais, baos, new Runnable() { + + public void run() { + final Buffer command = baos.toBuffer(); + + // Update the size field. + BufferEditor editor = command.buffer().bigEndianEditor(); + editor.writeInt(command.length); + + queue().execute(new Runnable() { + public void run() { + transport.offer(command); + } + }); + } + }); + } + }; + } + final Object svc = holder==null ? null : holder.factory.get(); + if(holder!=null) + { + final MethodData methodData = holder.getMethodData(encoded_method); + + + task = new Runnable() { + public void run() { + + final DataByteArrayOutputStream baos = new DataByteArrayOutputStream(); + try { + baos.writeInt(0); // make space for the size field. + baos.writeVarLong(correlation); + } catch (IOException e) { // should not happen + LOGGER.error("Failed to write to buffer",e); + throw new RuntimeException(e); + } - final DataByteArrayOutputStream baos = new DataByteArrayOutputStream(); - try { - baos.writeInt(0); // make space for the size field. - baos.writeVarLong(correlation); - } catch (IOException e) { // should not happen - throw new RuntimeException(e); + // Lets decode the remaining args on the target's executor + // to take cpu load off the + methodData.invocationStrategy.service(methodData.serializationStrategy, holder.loader, methodData.method, svc, bais, baos, new Runnable() { + public void run() { + holder.factory.unget(); + final Buffer command = baos.toBuffer(); + + // Update the size field. + BufferEditor editor = command.buffer().bigEndianEditor(); + editor.writeInt(command.length); + + queue().execute(new Runnable() { + public void run() { + transport.offer(command); + } + }); + } + }); } + }; - // Lets decode the remaining args on the target's executor - // to take cpu load off the - methodData.invocationStrategy.service(methodData.serializationStrategy, holder.loader, methodData.method, svc, bais, baos, new Runnable() { - public void run() { - holder.factory.unget(); - final Buffer command = baos.toBuffer(); - - // Update the size field. - BufferEditor editor = command.buffer().bigEndianEditor(); - editor.writeInt(command.length); - - queue().execute(new Runnable() { - public void run() { - transport.offer(command); - } - }); - } - }); - } - }; + } Executor executor; if( svc instanceof Dispatched ) { http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/b87b6636/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/InvocationTest.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/InvocationTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/InvocationTest.java index 7725375..647d8b6 100644 --- a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/InvocationTest.java +++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/InvocationTest.java @@ -28,6 +28,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -45,6 +48,7 @@ import org.apache.aries.rsa.provider.fastbin.test.StringValue; import org.fusesource.hawtdispatch.Dispatch; import org.fusesource.hawtdispatch.DispatchQueue; import org.junit.Test; +import org.osgi.framework.ServiceException; public class InvocationTest { final static long MILLIS_IN_A_NANO = TimeUnit.MILLISECONDS.toNanos(1); @@ -109,6 +113,57 @@ public class InvocationTest { } } + /** + * tests that requests to an unknown ID throw an exception instead of deadlocking the request + * @throws Exception + */ + @Test(timeout=30*1000) + public void testInvokeInvalidServiceID() throws Exception { + + DispatchQueue queue = Dispatch.createQueue(); + HashMap<String, SerializationStrategy> map = new HashMap<String, SerializationStrategy>(); + map.put("protobuf", new ProtobufSerializationStrategy()); + + ServerInvokerImpl server = new ServerInvokerImpl("tcp://localhost:0", queue, map); + server.start(); + + ClientInvokerImpl client = new ClientInvokerImpl(queue, map); + client.start(); + + try { + server.registerService("service-id", new ServerInvoker.ServiceFactory() { + public Object get() { + return new Hello2Impl(); + } + public void unget() { + } + }, Hello2Impl.class.getClassLoader()); + + + InvocationHandler handler = client.getProxy(server.getConnectAddress(), "service-id-broken", HelloImpl.class.getClassLoader()); + Hello2 hello = (Hello2) Proxy.newProxyInstance(Hello2Impl.class.getClassLoader(), new Class[] { Hello2.class }, handler); + + try{ + hello.hello("Fabric"); + fail("The service id does not exist, so this must fail"); + } catch (ServiceException e) { + assertNotNull(e.getMessage()); + } + try{ + hello.helloAsync("World").get(); + fail("The service id does not exist, so this must fail"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof ServiceException); + assertNotNull(e.getCause().getMessage()); + } + } + finally { + server.stop(); + client.stop(); + } + } + + @Test public void testObjectMethods() throws Exception { @@ -619,5 +674,27 @@ public class InvocationTest { } } + public interface Hello2 { + + public String hello(String name); + + public Future<String> helloAsync(String name); + } + + public class Hello2Impl implements Hello2 { + + @Override + public String hello(String name) { + return "Hello "+name; + } + + @Override + public Future<String> helloAsync(final String name) { + ExecutorService executor = Executors.newSingleThreadExecutor(); + return executor.submit(() -> { + return hello(name); + }); + } + } }
