Repository: cxf Updated Branches: refs/heads/master 9c5d9f77a -> 2d1da35c0
[CXF-6833] Prototyping Observable RxInvoker Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/2d1da35c Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/2d1da35c Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/2d1da35c Branch: refs/heads/master Commit: 2d1da35c001514fd37bd74180c1fdaa11e2e7b00 Parents: 9c5d9f7 Author: Sergey Beryozkin <sberyoz...@gmail.com> Authored: Tue Oct 11 17:37:58 2016 +0100 Committer: Sergey Beryozkin <sberyoz...@gmail.com> Committed: Tue Oct 11 17:37:58 2016 +0100 ---------------------------------------------------------------------- .../apache/cxf/jaxrs/client/AsyncClient.java | 32 ++++ .../cxf/jaxrs/client/JaxrsClientCallback.java | 4 +- .../jaxrs/client/JaxrsClientStageCallback.java | 10 +- .../org/apache/cxf/jaxrs/client/WebClient.java | 95 +++++----- .../client/spec/InvocationBuilderImpl.java | 8 +- rt/rs/extensions/rx/pom.xml | 5 + .../client/JaxrsClientObservableCallback.java | 47 +++++ .../jaxrs/rx/client/ObservableRxInvoker.java | 106 +++++++++++ .../rx/client/ObservableRxInvokerImpl.java | 174 +++++++++++++++++++ .../jaxrs/reactive/JAXRSReactiveTest.java | 43 +++-- 10 files changed, 454 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java new file mode 100644 index 0000000..e81a090 --- /dev/null +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.client; + +import java.lang.reflect.Type; + +//Work in progress +public interface AsyncClient { + void prepareAsyncClient(String httpMethod, + Object body, + Class<?> requestClass, + Type inType, + Class<?> respClass, + Type outType, + JaxrsClientCallback<?> cb); +} http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java index 0d10ad5..78c717e 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java @@ -31,12 +31,12 @@ import javax.ws.rs.client.InvocationCallback; import org.apache.cxf.endpoint.ClientCallback; -class JaxrsClientCallback<T> extends ClientCallback { +public class JaxrsClientCallback<T> extends ClientCallback { private final InvocationCallback<T> handler; private final Type outType; private final Class<?> responseClass; - JaxrsClientCallback(final InvocationCallback<T> handler, + public JaxrsClientCallback(final InvocationCallback<T> handler, Class<?> responseClass, Type outGenericType) { this.handler = handler; http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java index c26ffb4..116c78b 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java @@ -21,16 +21,15 @@ package org.apache.cxf.jaxrs.client; import java.lang.reflect.Type; import java.util.Map; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.function.Supplier; -class JaxrsClientStageCallback<T> extends JaxrsClientCallback<T> { +public class JaxrsClientStageCallback<T> extends JaxrsClientCallback<T> { private CompletableFuture<T> cf; - JaxrsClientStageCallback(Class<?> responseClass, + public JaxrsClientStageCallback(Class<?> responseClass, Type outGenericType, Executor ex) { super(null, responseClass, outGenericType); @@ -68,7 +67,7 @@ class JaxrsClientStageCallback<T> extends JaxrsClientCallback<T> { public boolean cancel(boolean mayInterruptIfRunning) { boolean result = super.cancel(mayInterruptIfRunning); if (result) { - cf.completeExceptionally(new CancellationException()); + cf.cancel(mayInterruptIfRunning); } return result; } @@ -81,8 +80,7 @@ class JaxrsClientStageCallback<T> extends JaxrsClientCallback<T> { try { return (T)JaxrsClientStageCallback.this.get()[0]; } catch (Exception ex) { - //handler.failed((InterruptedException)ex); - //throw ex; + cf.completeExceptionally(ex); return null; } } http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java index e522de2..3654072 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java @@ -20,6 +20,7 @@ package org.apache.cxf.jaxrs.client; import java.io.OutputStream; import java.lang.annotation.Annotation; +import java.lang.reflect.Constructor; import java.lang.reflect.Type; import java.net.URI; import java.util.Arrays; @@ -39,6 +40,7 @@ import javax.ws.rs.client.AsyncInvoker; import javax.ws.rs.client.CompletionStageRxInvoker; import javax.ws.rs.client.Entity; import javax.ws.rs.client.InvocationCallback; +import javax.ws.rs.client.RxInvoker; import javax.ws.rs.client.SyncInvoker; import javax.ws.rs.core.Cookie; import javax.ws.rs.core.EntityTag; @@ -80,7 +82,7 @@ import org.apache.cxf.message.Message; * Http-centric web client * */ -public class WebClient extends AbstractClient { +public class WebClient extends AbstractClient implements AsyncClient { private static final String REQUEST_CLASS = "request.class"; private static final String REQUEST_TYPE = "request.type"; private static final String REQUEST_ANNS = "request.annotations"; @@ -918,45 +920,29 @@ public class WebClient extends AbstractClient { Class<?> respClass, Type outType, InvocationCallback<T> callback) { - Annotation[] inAnns = null; - if (body instanceof Entity) { - Entity<?> entity = (Entity<?>)body; - setEntityHeaders(entity); - body = entity.getEntity(); - requestClass = body.getClass(); - inType = body.getClass(); - inAnns = entity.getAnnotations(); - } - - if (body instanceof GenericEntity) { - GenericEntity<?> genericEntity = (GenericEntity<?>)body; - body = genericEntity.getEntity(); - requestClass = genericEntity.getRawType(); - inType = genericEntity.getType(); - } - - MultivaluedMap<String, String> headers = prepareHeaders(respClass, body); - resetResponse(); - - Message m = finalizeMessage(httpMethod, headers, body, requestClass, inType, - inAnns, respClass, outType, null, null); - - m.getExchange().setSynchronous(false); JaxrsClientCallback<T> cb = new JaxrsClientCallback<T>(callback, respClass, outType); - m.getExchange().put(JaxrsClientCallback.class, cb); - - doRunInterceptorChain(m); - + prepareAsyncClient(httpMethod, body, requestClass, inType, respClass, outType, cb); return cb.createFuture(); } protected <T> CompletionStage<T> doInvokeAsyncStage(String httpMethod, Object body, - Class<?> requestClass, - Type inType, Class<?> respClass, Type outType, ExecutorService ex) { + JaxrsClientStageCallback<T> cb = new JaxrsClientStageCallback<T>(respClass, outType, ex); + prepareAsyncClient(httpMethod, body, null, null, respClass, outType, cb); + return cb.getCompletionStage(); + } + + @Override + public void prepareAsyncClient(String httpMethod, + Object body, + Class<?> requestClass, + Type inType, + Class<?> respClass, + Type outType, + JaxrsClientCallback<?> cb) { Annotation[] inAnns = null; if (body instanceof Entity) { Entity<?> entity = (Entity<?>)body; @@ -965,28 +951,24 @@ public class WebClient extends AbstractClient { requestClass = body.getClass(); inType = body.getClass(); inAnns = entity.getAnnotations(); - } - + } if (body instanceof GenericEntity) { GenericEntity<?> genericEntity = (GenericEntity<?>)body; body = genericEntity.getEntity(); requestClass = genericEntity.getRawType(); inType = genericEntity.getType(); } - + MultivaluedMap<String, String> headers = prepareHeaders(respClass, body); resetResponse(); Message m = finalizeMessage(httpMethod, headers, body, requestClass, inType, - inAnns, respClass, outType, null, null); - + inAnns, respClass, outType, null, null); + m.getExchange().setSynchronous(false); - JaxrsClientStageCallback<T> cb = new JaxrsClientStageCallback<T>(respClass, outType, ex); m.getExchange().put(JaxrsClientCallback.class, cb); - + doRunInterceptorChain(m); - - return cb.getCompletionStage(); } @@ -1288,11 +1270,34 @@ public class WebClient extends AbstractClient { // Link to JAX-RS 2.1 CompletionStageRxInvoker public CompletionStageRxInvoker rx() { - return new CompletionStageRxInvokerImpl(null); + return rx((ExecutorService)null); } public CompletionStageRxInvoker rx(ExecutorService ex) { return new CompletionStageRxInvokerImpl(ex); } + // Link to JAX-RS 2.1 RxInvoker extensions + @SuppressWarnings("rawtypes") + public <T extends RxInvoker> T rx(Class<T> clazz) { + return rx(clazz, (ExecutorService)null); + } + @SuppressWarnings({ + "rawtypes", "unchecked" + }) + public <T extends RxInvoker> T rx(Class<T> clazz, ExecutorService executorService) { + if (clazz == CompletionStageRxInvoker.class) { + return (T)rx(executorService); + } else { + String implClassName = clazz.getName() + "Impl"; + try { + Constructor c = ClassLoaderUtils.loadClass(implClassName, WebClient.class) + .getConstructor(AsyncClient.class, ExecutorService.class); + return (T)c.newInstance(this, executorService); + } catch (Throwable t) { + throw new ProcessingException(t); + } + } + + } private void setEntityHeaders(Entity<?> entity) { type(entity.getMediaType()); @@ -1731,22 +1736,22 @@ public class WebClient extends AbstractClient { @Override public <T> CompletionStage<T> method(String name, Entity<?> entity, Class<T> responseType) { - return doInvokeAsyncStage(name, entity, null, null, responseType, responseType, ex); + return doInvokeAsyncStage(name, entity, responseType, responseType, ex); } @Override public <T> CompletionStage<T> method(String name, Entity<?> entity, GenericType<T> responseType) { - return doInvokeAsyncStage(name, entity, null, null, responseType.getRawType(), responseType.getType(), ex); + return doInvokeAsyncStage(name, entity, responseType.getRawType(), responseType.getType(), ex); } @Override public <T> CompletionStage<T> method(String name, Class<T> responseType) { - return doInvokeAsyncStage(name, null, null, null, responseType, responseType, ex); + return doInvokeAsyncStage(name, null, responseType, responseType, ex); } @Override public <T> CompletionStage<T> method(String name, GenericType<T> responseType) { - return doInvokeAsyncStage(name, null, null, null, responseType.getRawType(), responseType.getType(), ex); + return doInvokeAsyncStage(name, null, responseType.getRawType(), responseType.getType(), ex); } } http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java index 04980ee..75c9b85 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java @@ -377,7 +377,7 @@ public class InvocationBuilderImpl implements Invocation.Builder { @Override public CompletionStageRxInvoker rx() { - return webClient.rx(); + return rx((ExecutorService)null); } @Override @@ -388,15 +388,13 @@ public class InvocationBuilderImpl implements Invocation.Builder { @SuppressWarnings("rawtypes") @Override public <T extends RxInvoker> T rx(Class<T> clazz) { - // TODO: Implementation required (JAX-RS 2.1) - return null; + return rx(clazz, (ExecutorService)null); } @SuppressWarnings("rawtypes") @Override public <T extends RxInvoker> T rx(Class<T> clazz, ExecutorService executorService) { - // TODO: Implementation required (JAX-RS 2.1) - return null; + return webClient.rx(clazz, executorService); } @Override http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/extensions/rx/pom.xml ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/rx/pom.xml index a65484a..a1207a5 100644 --- a/rt/rs/extensions/rx/pom.xml +++ b/rt/rs/extensions/rx/pom.xml @@ -37,6 +37,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-rs-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>io.reactivex</groupId> <artifactId>rxjava</artifactId> <version>${cxf.rx.java.version}</version> http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java new file mode 100644 index 0000000..c40aef0 --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.jaxrs.rx.client; + +import java.lang.reflect.Type; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; + +import org.apache.cxf.jaxrs.client.JaxrsClientCallback; + +import rx.Observable; +import rx.schedulers.Schedulers; + +public class JaxrsClientObservableCallback<T> extends JaxrsClientCallback<T> { + private Observable<T> observable; + + public JaxrsClientObservableCallback(Class<?> responseClass, + Type outGenericType, + Executor ex) { + super(null, responseClass, outGenericType); + Future<T> f = super.createFuture(); + observable = ex == null ? Observable.from(f) + : Observable.from(f, Schedulers.from(ex)); + } + + public Observable<T> getObservable() { + return observable; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvoker.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvoker.java new file mode 100644 index 0000000..3df1931 --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvoker.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx.client; + +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.RxInvoker; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.Response; + +import rx.Observable; + +@SuppressWarnings("rawtypes") +public interface ObservableRxInvoker extends RxInvoker<Observable> { + + @Override + Observable<Response> get(); + + @Override + <T> Observable<T> get(Class<T> responseType); + + @Override + <T> Observable<T> get(GenericType<T> responseType); + + @Override + Observable<Response> put(Entity<?> entity); + + @Override + <T> Observable<T> put(Entity<?> entity, Class<T> clazz); + + @Override + <T> Observable<T> put(Entity<?> entity, GenericType<T> type); + + @Override + Observable<Response> post(Entity<?> entity); + + @Override + <T> Observable<T> post(Entity<?> entity, Class<T> clazz); + + @Override + <T> Observable<T> post(Entity<?> entity, GenericType<T> type); + + @Override + Observable<Response> delete(); + + @Override + <T> Observable<T> delete(Class<T> responseType); + + @Override + <T> Observable<T> delete(GenericType<T> responseType); + + @Override + Observable<Response> head(); + + @Override + Observable<Response> options(); + + @Override + <T> Observable<T> options(Class<T> responseType); + + @Override + <T> Observable<T> options(GenericType<T> responseType); + + @Override + Observable<Response> trace(); + + @Override + <T> Observable<T> trace(Class<T> responseType); + + @Override + <T> Observable<T> trace(GenericType<T> responseType); + + @Override + Observable<Response> method(String name); + + @Override + <T> Observable<T> method(String name, Class<T> responseType); + + @Override + <T> Observable<T> method(String name, GenericType<T> responseType); + + @Override + Observable<Response> method(String name, Entity<?> entity); + + @Override + <T> Observable<T> method(String name, Entity<?> entity, Class<T> responseType); + + @Override + <T> Observable<T> method(String name, Entity<?> entity, GenericType<T> responseType); +} + http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java new file mode 100644 index 0000000..222994a --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx.client; + +import java.lang.reflect.Type; +import java.util.concurrent.ExecutorService; + +import javax.ws.rs.HttpMethod; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.Response; + +import org.apache.cxf.jaxrs.client.AsyncClient; + +import rx.Observable; + +public class ObservableRxInvokerImpl implements ObservableRxInvoker { + private ExecutorService ex; + private AsyncClient wc; + public ObservableRxInvokerImpl(AsyncClient wc, ExecutorService ex) { + this.wc = wc; + this.ex = ex; + } + + @Override + public Observable<Response> get() { + return get(Response.class); + } + + @Override + public <T> Observable<T> get(Class<T> responseType) { + return method(HttpMethod.GET, responseType); + } + + @Override + public <T> Observable<T> get(GenericType<T> responseType) { + return method(HttpMethod.GET, responseType); + } + + @Override + public Observable<Response> put(Entity<?> entity) { + return put(entity, Response.class); + } + + @Override + public <T> Observable<T> put(Entity<?> entity, Class<T> responseType) { + return method(HttpMethod.PUT, entity, responseType); + } + + @Override + public <T> Observable<T> put(Entity<?> entity, GenericType<T> responseType) { + return method(HttpMethod.PUT, entity, responseType); + } + + @Override + public Observable<Response> post(Entity<?> entity) { + return post(entity, Response.class); + } + + @Override + public <T> Observable<T> post(Entity<?> entity, Class<T> responseType) { + return method(HttpMethod.POST, entity, responseType); + } + + @Override + public <T> Observable<T> post(Entity<?> entity, GenericType<T> responseType) { + return method(HttpMethod.POST, entity, responseType); + } + + @Override + public Observable<Response> delete() { + return delete(Response.class); + } + + @Override + public <T> Observable<T> delete(Class<T> responseType) { + return method(HttpMethod.DELETE, responseType); + } + + @Override + public <T> Observable<T> delete(GenericType<T> responseType) { + return method(HttpMethod.DELETE, responseType); + } + + @Override + public Observable<Response> head() { + return method(HttpMethod.HEAD); + } + + @Override + public Observable<Response> options() { + return options(Response.class); + } + + @Override + public <T> Observable<T> options(Class<T> responseType) { + return method(HttpMethod.OPTIONS, responseType); + } + + @Override + public <T> Observable<T> options(GenericType<T> responseType) { + return method(HttpMethod.OPTIONS, responseType); + } + + @Override + public Observable<Response> trace() { + return trace(Response.class); + } + + @Override + public <T> Observable<T> trace(Class<T> responseType) { + return method("TRACE", responseType); + } + + @Override + public <T> Observable<T> trace(GenericType<T> responseType) { + return method("TRACE", responseType); + } + + @Override + public Observable<Response> method(String name) { + return method(name, Response.class); + } + + @Override + public Observable<Response> method(String name, Entity<?> entity) { + return method(name, entity, Response.class); + } + + @Override + public <T> Observable<T> method(String name, Entity<?> entity, Class<T> responseType) { + return doInvokeAsync(name, entity, responseType, responseType); + } + + @Override + public <T> Observable<T> method(String name, Entity<?> entity, GenericType<T> responseType) { + return doInvokeAsync(name, entity, responseType.getRawType(), responseType.getType()); + } + + @Override + public <T> Observable<T> method(String name, Class<T> responseType) { + return doInvokeAsync(name, null, responseType, responseType); + } + + @Override + public <T> Observable<T> method(String name, GenericType<T> responseType) { + return doInvokeAsync(name, null, responseType.getRawType(), responseType.getType()); + } + + protected <T> Observable<T> doInvokeAsync(String httpMethod, + Object body, + Class<?> respClass, + Type outType) { + JaxrsClientObservableCallback<T> cb = new JaxrsClientObservableCallback<T>(respClass, outType, ex); + wc.prepareAsyncClient(httpMethod, body, null, null, respClass, outType, cb); + return cb.getObservable(); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java index 59da436..10ce5ea 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java @@ -21,14 +21,15 @@ package org.apache.cxf.systest.jaxrs.reactive; import java.util.Collections; import java.util.List; -import java.util.concurrent.Future; +import javax.ws.rs.NotFoundException; import javax.ws.rs.core.GenericType; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import org.apache.cxf.jaxrs.client.WebClient; import org.apache.cxf.jaxrs.model.AbstractResourceInfo; +import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker; import org.apache.cxf.jaxrs.rx.provider.ObservableReader; import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; @@ -73,15 +74,6 @@ public class JAXRSReactiveTest extends AbstractBusClientServerTestBase { obs.subscribe(s -> assertResponse(s)); } - @Test - public void testGetHelloWorldAsyncObservable() throws Exception { - String address = "http://localhost:" + PORT + "/reactive/textAsync"; - WebClient wc = WebClient.create(address); - Observable<String> obs = - getObservable(wc.accept("text/plain").async().get(String.class)); - obs.subscribe(s -> assertResponse(s)); - } - private void assertResponse(String s) { assertEquals("Hello, world!", s); } @@ -129,7 +121,34 @@ public class JAXRSReactiveTest extends AbstractBusClientServerTestBase { assertEquals("World", beans.get(1).getAudience()); } - private Observable<String> getObservable(Future<String> future) { - return Observable.from(future); + @Test + public void testGetHelloWorldAsyncObservable() throws Exception { + String address = "http://localhost:" + PORT + "/reactive/textAsync"; + WebClient wc = WebClient.create(address); + Observable<String> obs = wc.accept("text/plain") + .rx(ObservableRxInvoker.class) + .get(String.class); + obs.map(s -> { + return s + s; + }); + + Thread.sleep(3000); + + obs.subscribe(s -> assertDuplicateResponse(s)); + } + @Test + public void testGetHelloWorldAsyncObservable404() throws Exception { + String address = "http://localhost:" + PORT + "/reactive/textAsync404"; + WebClient wc = WebClient.create(address); + try { + wc.rx(ObservableRxInvoker.class).get(String.class).subscribe(s -> System.out.println()); + fail("Exception expected"); + } catch (Throwable ex) { + assertTrue(ex.getCause().getCause() instanceof NotFoundException); + } + } + + private void assertDuplicateResponse(String s) { + assertEquals("Hello, world!Hello, world!", s); } }