Prototyping the JAX-RS proxy async support
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/471f8851 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/471f8851 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/471f8851 Branch: refs/heads/master-jaxrs-2.1 Commit: 471f8851f5cb1af293276eb312aa0744114a0a9b Parents: e6d42f6 Author: Sergey Beryozkin <[email protected]> Authored: Tue Jun 7 17:42:07 2016 +0100 Committer: Sergey Beryozkin <[email protected]> Committed: Tue Jun 7 17:42:07 2016 +0100 ---------------------------------------------------------------------- .../apache/cxf/common/util/PrimitiveUtils.java | 18 +++ .../apache/cxf/jaxrs/client/AbstractClient.java | 100 +++++++++++++- .../cxf/jaxrs/client/ClientProxyImpl.java | 86 +++++++++++- .../org/apache/cxf/jaxrs/client/WebClient.java | 133 +++---------------- .../cxf/systest/jaxrs/JAXRSAsyncClientTest.java | 40 ++++++ 5 files changed, 264 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/471f8851/core/src/main/java/org/apache/cxf/common/util/PrimitiveUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/cxf/common/util/PrimitiveUtils.java b/core/src/main/java/org/apache/cxf/common/util/PrimitiveUtils.java index c641b50..16408d8 100644 --- a/core/src/main/java/org/apache/cxf/common/util/PrimitiveUtils.java +++ b/core/src/main/java/org/apache/cxf/common/util/PrimitiveUtils.java @@ -19,12 +19,30 @@ package org.apache.cxf.common.util; +import java.util.HashMap; +import java.util.Map; + public final class PrimitiveUtils { + private static final Map<Class<?>, Class<?>> AUTOBOXED_PRIMITIVES_MAP; + static { + AUTOBOXED_PRIMITIVES_MAP = new HashMap<Class<?>, Class<?>>(); + AUTOBOXED_PRIMITIVES_MAP.put(byte.class, Byte.class); + AUTOBOXED_PRIMITIVES_MAP.put(short.class, Short.class); + AUTOBOXED_PRIMITIVES_MAP.put(int.class, Integer.class); + AUTOBOXED_PRIMITIVES_MAP.put(long.class, Long.class); + AUTOBOXED_PRIMITIVES_MAP.put(float.class, Float.class); + AUTOBOXED_PRIMITIVES_MAP.put(double.class, Double.class); + AUTOBOXED_PRIMITIVES_MAP.put(boolean.class, Boolean.class); + } private PrimitiveUtils() { } + public static boolean canPrimitiveTypeBeAutoboxed(Class<?> primitiveClass, Class<?> type) { + return primitiveClass.isPrimitive() && type == AUTOBOXED_PRIMITIVES_MAP.get(primitiveClass); + } + public static Class<?> getClass(String value) { Class<?> clz = null; if ("int".equals(value)) { http://git-wip-us.apache.org/repos/asf/cxf/blob/471f8851/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java index d98c34d..f9d9aec 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java @@ -24,7 +24,9 @@ import java.io.InputStream; import java.io.OutputStream; import java.lang.annotation.Annotation; import java.lang.reflect.Constructor; +import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.lang.reflect.TypeVariable; import java.net.HttpURLConnection; import java.net.URI; import java.text.SimpleDateFormat; @@ -43,6 +45,7 @@ import java.util.logging.Logger; import javax.ws.rs.ProcessingException; import javax.ws.rs.WebApplicationException; +import javax.ws.rs.client.InvocationCallback; import javax.ws.rs.core.Cookie; import javax.ws.rs.core.EntityTag; import javax.ws.rs.core.Form; @@ -1014,7 +1017,7 @@ public abstract class AbstractClient implements Client { = CastUtils.cast((Map<?, ?>)outMessage.get(Message.INVOCATION_CONTEXT)); return CastUtils.cast((Map<?, ?>)invContext.get(REQUEST_CONTEXT)); } - + protected List<?> getContentsList(Object body) { return body == null ? new MessageContentsList() : new MessageContentsList(body); } @@ -1067,6 +1070,50 @@ public abstract class AbstractClient implements Client { outMessage.getExchange().put("org.apache.cxf.resource.operation.name", name); } + protected static Type getCallbackType(InvocationCallback<?> callback) { + Class<?> cls = callback.getClass(); + ParameterizedType pt = findCallbackType(cls); + Type actualType = null; + for (Type tp : pt.getActualTypeArguments()) { + actualType = tp; + break; + } + if (actualType instanceof TypeVariable) { + actualType = InjectionUtils.getSuperType(cls, (TypeVariable<?>)actualType); + } + return actualType; + } + + protected static ParameterizedType findCallbackType(Class<?> cls) { + if (cls == null || cls == Object.class) { + return null; + } + for (Type c2 : cls.getGenericInterfaces()) { + if (c2 instanceof ParameterizedType) { + ParameterizedType pt = (ParameterizedType)c2; + if (InvocationCallback.class.equals(pt.getRawType())) { + return pt; + } + } + } + return findCallbackType(cls.getSuperclass()); + } + + protected static Class<?> getCallbackClass(Type outType) { + Class<?> respClass = null; + if (outType instanceof Class) { + respClass = (Class<?>)outType; + } else if (outType instanceof ParameterizedType) { + ParameterizedType pt = (ParameterizedType)outType; + if (pt.getRawType() instanceof Class) { + respClass = (Class<?>)pt.getRawType(); + } + } else if (outType == null) { + respClass = Response.class; + } + return respClass; + } + protected abstract class AbstractBodyWriter extends AbstractOutDatabindingInterceptor { public AbstractBodyWriter() { @@ -1133,4 +1180,55 @@ public abstract class AbstractClient implements Client { } } + protected abstract class AbstractClientAsyncResponseInterceptor extends AbstractPhaseInterceptor<Message> { + AbstractClientAsyncResponseInterceptor() { + super(Phase.UNMARSHAL); + } + + @Override + public void handleMessage(Message message) throws Fault { + if (message.getExchange().isSynchronous()) { + return; + } + handleAsyncResponse(message); + } + + @Override + public void handleFault(Message message) { + if (message.getExchange().isSynchronous()) { + return; + } + handleAsyncFault(message); + } + + private void handleAsyncResponse(Message message) { + JaxrsClientCallback<?> cb = message.getExchange().get(JaxrsClientCallback.class); + Response r = null; + try { + Object[] results = preProcessResult(message); + if (results != null && results.length == 1) { + r = (Response)results[0]; + } + } catch (Exception ex) { + Throwable t = ex instanceof WebApplicationException + ? (WebApplicationException)ex + : ex instanceof ProcessingException + ? (ProcessingException)ex : new ProcessingException(ex); + cb.handleException(message, t); + return; + } + doHandleAsyncResponse(message, r, cb); + } + + protected abstract void doHandleAsyncResponse(Message message, Response r, JaxrsClientCallback<?> cb); + + protected void closeAsyncResponseIfPossible(Response r, Message outMessage, JaxrsClientCallback<?> cb) { + if (responseStreamCanBeClosed(outMessage, cb.getResponseClass())) { + r.close(); + } + } + + protected void handleAsyncFault(Message message) { + } + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/471f8851/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java index 4f56836..1e1d427 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/ClientProxyImpl.java @@ -48,6 +48,7 @@ import javax.ws.rs.PathParam; import javax.ws.rs.ProcessingException; import javax.ws.rs.QueryParam; import javax.ws.rs.WebApplicationException; +import javax.ws.rs.client.InvocationCallback; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; @@ -61,6 +62,7 @@ import org.apache.cxf.common.classloader.ClassLoaderUtils; import org.apache.cxf.common.classloader.ClassLoaderUtils.ClassLoaderHolder; import org.apache.cxf.common.i18n.BundleUtils; import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.common.util.PrimitiveUtils; import org.apache.cxf.common.util.PropertyUtils; import org.apache.cxf.common.util.ReflectionUtil; import org.apache.cxf.common.util.StringUtils; @@ -122,6 +124,7 @@ public class ClientProxyImpl extends AbstractClient implements this.isRoot = isRoot; this.inheritHeaders = inheritHeaders; initValuesMap(varValues); + cfg.getInInterceptors().add(new ClientAsyncResponseInterceptor()); } private void initValuesMap(Object... varValues) { @@ -732,7 +735,12 @@ public class ClientProxyImpl extends AbstractClient implements reqContext.put(OperationResourceInfo.class.getName(), ori); reqContext.put("BODY_INDEX", bodyIndex); - // execute chain + // execute chain + InvocationCallback<Object> asyncCallback = checkAsyncCallback(ori, reqContext); + if (asyncCallback != null) { + doInvokeAsync(ori, outMessage, asyncCallback); + return null; + } doRunInterceptorChain(outMessage); Object[] results = preProcessResult(outMessage); @@ -745,6 +753,7 @@ public class ClientProxyImpl extends AbstractClient implements } finally { completeExchange(outMessage.getExchange(), true); } + } finally { if (origLoader != null) { origLoader.reset(); @@ -756,6 +765,54 @@ public class ClientProxyImpl extends AbstractClient implements } + private InvocationCallback<Object> checkAsyncCallback(OperationResourceInfo ori, + Map<String, Object> reqContext) { + Object callbackProp = reqContext.get(InvocationCallback.class.getName()); + if (callbackProp != null) { + if (callbackProp instanceof Collection) { + @SuppressWarnings("unchecked") + Collection<InvocationCallback<Object>> callbacks = (Collection<InvocationCallback<Object>>)callbackProp; + for (InvocationCallback<Object> callback : callbacks) { + if (doCheckAsyncCallback(ori, callback) != null) { + return callback; + } + } + } else { + @SuppressWarnings("unchecked") + InvocationCallback<Object> callback = (InvocationCallback<Object>)callbackProp; + return doCheckAsyncCallback(ori, callback); + } + } + return null; + + } + + private InvocationCallback<Object> doCheckAsyncCallback(OperationResourceInfo ori, + InvocationCallback<Object> callback) { + Type callbackOutType = getCallbackType(callback); + Class<?> callbackRespClass = getCallbackClass(callbackOutType); + + Class<?> methodReturnType = ori.getMethodToInvoke().getReturnType(); + if (callbackRespClass == Response.class + || callbackRespClass.isAssignableFrom(methodReturnType) + || PrimitiveUtils.canPrimitiveTypeBeAutoboxed(methodReturnType, callbackRespClass)) { + return callback; + } else { + return null; + } + } + + protected void doInvokeAsync(OperationResourceInfo ori, Message outMessage, + InvocationCallback<Object> asyncCallback) { + outMessage.getExchange().setSynchronous(false); + JaxrsClientCallback<?> cb = new JaxrsClientCallback<Object>(asyncCallback, + ori.getMethodToInvoke().getReturnType(), ori.getMethodToInvoke().getGenericReturnType()); + outMessage.getExchange().put(JaxrsClientCallback.class, cb); + doRunInterceptorChain(outMessage); + + + } + @Override protected Object retryInvoke(URI newRequestURI, MultivaluedMap<String, String> headers, @@ -889,4 +946,31 @@ public class ClientProxyImpl extends AbstractClient implements return anns; } } + class ClientAsyncResponseInterceptor extends AbstractClientAsyncResponseInterceptor { + @Override + protected void doHandleAsyncResponse(Message message, Response r, JaxrsClientCallback<?> cb) { + Object entity = null; + if (r == null) { + try { + entity = handleResponse(message.getExchange().getOutMessage(), + cb.getResponseClass()); + } catch (Throwable t) { + cb.handleException(message, t); + return; + } finally { + completeExchange(message.getExchange(), false); + } + } + if (cb.getResponseClass() == null || Response.class.equals(cb.getResponseClass())) { + cb.handleResponse(message, new Object[] {r}); + } else if (r != null && r.getStatus() >= 300) { + cb.handleException(message, convertToWebApplicationException(r)); + } else { + cb.handleResponse(message, new Object[] {entity}); + closeAsyncResponseIfPossible(r, message, cb); + } + } + + + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/471f8851/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java index 0ecfcb4..89780b1 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java @@ -20,9 +20,7 @@ package org.apache.cxf.jaxrs.client; import java.io.OutputStream; import java.lang.annotation.Annotation; -import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; -import java.lang.reflect.TypeVariable; import java.net.URI; import java.util.Arrays; import java.util.Collection; @@ -67,13 +65,10 @@ import org.apache.cxf.jaxrs.impl.UriBuilderImpl; import org.apache.cxf.jaxrs.model.ParameterType; import org.apache.cxf.jaxrs.model.URITemplate; import org.apache.cxf.jaxrs.utils.HttpUtils; -import org.apache.cxf.jaxrs.utils.InjectionUtils; import org.apache.cxf.jaxrs.utils.JAXRSUtils; import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.Message; -import org.apache.cxf.phase.AbstractPhaseInterceptor; -import org.apache.cxf.phase.Phase; /** @@ -899,34 +894,6 @@ public class WebClient extends AbstractClient { return r; } - private ParameterizedType findCallbackType(Class<?> cls) { - if (cls == null || cls == Object.class) { - return null; - } - for (Type c2 : cls.getGenericInterfaces()) { - if (c2 instanceof ParameterizedType) { - ParameterizedType pt = (ParameterizedType)c2; - if (InvocationCallback.class.equals(pt.getRawType())) { - return pt; - } - } - } - return findCallbackType(cls.getSuperclass()); - } - private Type getCallbackType(InvocationCallback<?> callback) { - Class<?> cls = callback.getClass(); - ParameterizedType pt = findCallbackType(cls); - Type actualType = null; - for (Type tp : pt.getActualTypeArguments()) { - actualType = tp; - break; - } - if (actualType instanceof TypeVariable) { - actualType = InjectionUtils.getSuperType(cls, (TypeVariable<?>)actualType); - } - return actualType; - } - protected <T> Future<T> doInvokeAsyncCallback(String httpMethod, Object body, Class<?> requestClass, @@ -934,20 +901,7 @@ public class WebClient extends AbstractClient { InvocationCallback<T> callback) { Type outType = getCallbackType(callback); - Class<?> respClass = null; - if (outType instanceof Class) { - respClass = (Class<?>)outType; - } else if (outType instanceof ParameterizedType) { - ParameterizedType pt = (ParameterizedType)outType; - if (pt.getRawType() instanceof Class) { - respClass = (Class<?>)pt.getRawType(); - } - } else if (outType == null) { - respClass = Response.class; - } - - - + Class<?> respClass = getCallbackClass(outType); return doInvokeAsync(httpMethod, body, requestClass, inType, respClass, outType, callback); } @@ -1006,53 +960,33 @@ public class WebClient extends AbstractClient { return headers; } - private void handleAsyncResponse(Message message) { - JaxrsClientCallback<?> cb = message.getExchange().get(JaxrsClientCallback.class); - Response r = null; - try { - Object[] results = preProcessResult(message); - if (results != null && results.length == 1) { - r = (Response)results[0]; + class ClientAsyncResponseInterceptor extends AbstractClientAsyncResponseInterceptor { + @Override + protected void doHandleAsyncResponse(Message message, Response r, JaxrsClientCallback<?> cb) { + if (r == null) { + try { + r = handleResponse(message.getExchange().getOutMessage(), + cb.getResponseClass(), + cb.getOutGenericType()); + } catch (Throwable t) { + cb.handleException(message, t); + return; + } finally { + completeExchange(message.getExchange(), false); + } } - } catch (Exception ex) { - Throwable t = ex instanceof WebApplicationException - ? (WebApplicationException)ex - : ex instanceof ProcessingException - ? (ProcessingException)ex : new ProcessingException(ex); - cb.handleException(message, t); - return; - } - if (r == null) { - try { - r = handleResponse(message.getExchange().getOutMessage(), - cb.getResponseClass(), - cb.getOutGenericType()); - } catch (Throwable t) { - cb.handleException(message, t); - return; - } finally { - completeExchange(message.getExchange(), false); + if (cb.getResponseClass() == null || Response.class.equals(cb.getResponseClass())) { + cb.handleResponse(message, new Object[] {r}); + } else if (r.getStatus() >= 300) { + cb.handleException(message, convertToWebApplicationException(r)); + } else { + cb.handleResponse(message, new Object[] {r.getEntity()}); + closeAsyncResponseIfPossible(r, message, cb); } } - if (cb.getResponseClass() == null || Response.class.equals(cb.getResponseClass())) { - cb.handleResponse(message, new Object[] {r}); - } else if (r.getStatus() >= 300) { - cb.handleException(message, convertToWebApplicationException(r)); - } else { - cb.handleResponse(message, new Object[] {r.getEntity()}); - closeAsyncResponseIfPossible(r, message, cb); - } } - private void closeAsyncResponseIfPossible(Response r, Message outMessage, JaxrsClientCallback<?> cb) { - if (responseStreamCanBeClosed(outMessage, cb.getResponseClass())) { - r.close(); - } - } - private void handleAsyncFault(Message message) { - } - //TODO: retry invocation will not work in case of async request failures for the moment @Override @@ -1294,29 +1228,6 @@ public class WebClient extends AbstractClient { return new SyncInvokerImpl(); } - class ClientAsyncResponseInterceptor extends AbstractPhaseInterceptor<Message> { - ClientAsyncResponseInterceptor() { - super(Phase.UNMARSHAL); - } - - @Override - public void handleMessage(Message message) throws Fault { - if (message.getExchange().isSynchronous()) { - return; - } - handleAsyncResponse(message); - } - - @Override - public void handleFault(Message message) { - if (message.getExchange().isSynchronous()) { - return; - } - handleAsyncFault(message); - } - - } - private void setEntityHeaders(Entity<?> entity) { type(entity.getMediaType()); if (entity.getLanguage() != null) { http://git-wip-us.apache.org/repos/asf/cxf/blob/471f8851/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java index d02e117..cd5580f 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java @@ -52,6 +52,7 @@ import javax.ws.rs.ext.MessageBodyReader; import javax.ws.rs.ext.MessageBodyWriter; import javax.xml.ws.Holder; +import org.apache.cxf.jaxrs.client.JAXRSClientFactory; import org.apache.cxf.jaxrs.client.WebClient; import org.apache.cxf.jaxrs.model.AbstractResourceInfo; import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; @@ -263,6 +264,45 @@ public class JAXRSAsyncClientTest extends AbstractBusClientServerTestBase { assertTrue(((GenericInvocationCallback)callback).getResult().readEntity(Boolean.class)); } + @Test + public void testAsyncProxyPrimitiveResponse() throws Exception { + String address = "http://localhost:" + PORT; + final Holder<Boolean> holder = new Holder<Boolean>(); + final InvocationCallback<Boolean> callback = new InvocationCallback<Boolean>() { + public void completed(Boolean response) { + holder.value = response; + } + public void failed(Throwable error) { + } + }; + + BookStore store = JAXRSClientFactory.create(address, BookStore.class); + WebClient.getConfig(store).getRequestContext().put(InvocationCallback.class.getName(), callback); + store.checkBook(123L); + Thread.sleep(3000); + assertTrue(holder.value); + } + @Test + public void testAsyncProxyBookResponse() throws Exception { + String address = "http://localhost:" + PORT; + final Holder<Book> holder = new Holder<Book>(); + final InvocationCallback<Book> callback = new InvocationCallback<Book>() { + public void completed(Book response) { + holder.value = response; + } + public void failed(Throwable error) { + } + }; + + BookStore store = JAXRSClientFactory.create(address, BookStore.class); + WebClient.getConfig(store).getRequestContext().put(InvocationCallback.class.getName(), callback); + Book book = store.getBookByMatrixParams("12", "3"); + assertNull(book); + Thread.sleep(3000); + assertNotNull(holder.value); + assertEquals(123L, holder.value.getId()); + } + @SuppressWarnings({ "unchecked", "rawtypes" })
