Repository: cxf
Updated Branches:
  refs/heads/master 9c5d9f77a -> 2d1da35c0


[CXF-6833] Prototyping Observable RxInvoker


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/2d1da35c
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/2d1da35c
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/2d1da35c

Branch: refs/heads/master
Commit: 2d1da35c001514fd37bd74180c1fdaa11e2e7b00
Parents: 9c5d9f7
Author: Sergey Beryozkin <sberyoz...@gmail.com>
Authored: Tue Oct 11 17:37:58 2016 +0100
Committer: Sergey Beryozkin <sberyoz...@gmail.com>
Committed: Tue Oct 11 17:37:58 2016 +0100

----------------------------------------------------------------------
 .../apache/cxf/jaxrs/client/AsyncClient.java    |  32 ++++
 .../cxf/jaxrs/client/JaxrsClientCallback.java   |   4 +-
 .../jaxrs/client/JaxrsClientStageCallback.java  |  10 +-
 .../org/apache/cxf/jaxrs/client/WebClient.java  |  95 +++++-----
 .../client/spec/InvocationBuilderImpl.java      |   8 +-
 rt/rs/extensions/rx/pom.xml                     |   5 +
 .../client/JaxrsClientObservableCallback.java   |  47 +++++
 .../jaxrs/rx/client/ObservableRxInvoker.java    | 106 +++++++++++
 .../rx/client/ObservableRxInvokerImpl.java      | 174 +++++++++++++++++++
 .../jaxrs/reactive/JAXRSReactiveTest.java       |  43 +++--
 10 files changed, 454 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java
new file mode 100644
index 0000000..e81a090
--- /dev/null
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.client;
+
+import java.lang.reflect.Type;
+
+//Work in progress
+public interface AsyncClient {
+    void prepareAsyncClient(String httpMethod, 
+                            Object body,
+                            Class<?> requestClass,
+                            Type inType,
+                            Class<?> respClass,
+                            Type outType,
+                            JaxrsClientCallback<?> cb);
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
index 0d10ad5..78c717e 100644
--- 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
+++ 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
@@ -31,12 +31,12 @@ import javax.ws.rs.client.InvocationCallback;
 
 import org.apache.cxf.endpoint.ClientCallback;
 
-class JaxrsClientCallback<T> extends ClientCallback {
+public class JaxrsClientCallback<T> extends ClientCallback {
     private final InvocationCallback<T> handler;
     private final Type outType;
     private final Class<?> responseClass;
     
-    JaxrsClientCallback(final InvocationCallback<T> handler, 
+    public JaxrsClientCallback(final InvocationCallback<T> handler, 
                         Class<?> responseClass, 
                         Type outGenericType) {
         this.handler = handler;

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java
 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java
index c26ffb4..116c78b 100644
--- 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java
+++ 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java
@@ -21,16 +21,15 @@ package org.apache.cxf.jaxrs.client;
 
 import java.lang.reflect.Type;
 import java.util.Map;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
 import java.util.function.Supplier;
 
-class JaxrsClientStageCallback<T> extends JaxrsClientCallback<T>  {
+public class JaxrsClientStageCallback<T> extends JaxrsClientCallback<T>  {
     private CompletableFuture<T> cf;
     
-    JaxrsClientStageCallback(Class<?> responseClass, 
+    public JaxrsClientStageCallback(Class<?> responseClass, 
                              Type outGenericType,
                              Executor ex) {
         super(null, responseClass, outGenericType);
@@ -68,7 +67,7 @@ class JaxrsClientStageCallback<T> extends 
JaxrsClientCallback<T>  {
     public boolean cancel(boolean mayInterruptIfRunning) {
         boolean result = super.cancel(mayInterruptIfRunning);
         if (result) {
-            cf.completeExceptionally(new CancellationException());
+            cf.cancel(mayInterruptIfRunning);
         }
         return result;
     }
@@ -81,8 +80,7 @@ class JaxrsClientStageCallback<T> extends 
JaxrsClientCallback<T>  {
             try {
                 return (T)JaxrsClientStageCallback.this.get()[0];
             } catch (Exception ex) {
-                //handler.failed((InterruptedException)ex);
-                //throw ex;
+                cf.completeExceptionally(ex);
                 return null;
             }
         }

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
index e522de2..3654072 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
@@ -20,6 +20,7 @@ package org.apache.cxf.jaxrs.client;
 
 import java.io.OutputStream;
 import java.lang.annotation.Annotation;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Type;
 import java.net.URI;
 import java.util.Arrays;
@@ -39,6 +40,7 @@ import javax.ws.rs.client.AsyncInvoker;
 import javax.ws.rs.client.CompletionStageRxInvoker;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.client.RxInvoker;
 import javax.ws.rs.client.SyncInvoker;
 import javax.ws.rs.core.Cookie;
 import javax.ws.rs.core.EntityTag;
@@ -80,7 +82,7 @@ import org.apache.cxf.message.Message;
  * Http-centric web client
  *
  */
-public class WebClient extends AbstractClient {
+public class WebClient extends AbstractClient implements AsyncClient {
     private static final String REQUEST_CLASS = "request.class";
     private static final String REQUEST_TYPE = "request.type";
     private static final String REQUEST_ANNS = "request.annotations";
@@ -918,45 +920,29 @@ public class WebClient extends AbstractClient {
                                           Class<?> respClass,
                                           Type outType,
                                           InvocationCallback<T> callback) {
-        Annotation[] inAnns = null;
-        if (body instanceof Entity) {
-            Entity<?> entity = (Entity<?>)body;
-            setEntityHeaders(entity);
-            body = entity.getEntity();
-            requestClass = body.getClass();
-            inType = body.getClass();
-            inAnns = entity.getAnnotations();
-        }
-        
-        if (body instanceof GenericEntity) {
-            GenericEntity<?> genericEntity = (GenericEntity<?>)body;
-            body = genericEntity.getEntity();
-            requestClass = genericEntity.getRawType();
-            inType = genericEntity.getType();
-        }
-        
-        MultivaluedMap<String, String> headers = prepareHeaders(respClass, 
body);
-        resetResponse();
-
-        Message m = finalizeMessage(httpMethod, headers, body, requestClass, 
inType, 
-                                    inAnns, respClass, outType, null, null);
-        
-        m.getExchange().setSynchronous(false);
         JaxrsClientCallback<T> cb = new JaxrsClientCallback<T>(callback, 
respClass, outType);
-        m.getExchange().put(JaxrsClientCallback.class, cb);
-        
-        doRunInterceptorChain(m);
-        
+        prepareAsyncClient(httpMethod, body, requestClass, inType, respClass, 
outType, cb);        
         return cb.createFuture();
     }
     
     protected <T> CompletionStage<T> doInvokeAsyncStage(String httpMethod, 
                                           Object body, 
-                                          Class<?> requestClass,
-                                          Type inType,
                                           Class<?> respClass,
                                           Type outType,
                                           ExecutorService ex) {
+        JaxrsClientStageCallback<T> cb = new 
JaxrsClientStageCallback<T>(respClass, outType, ex);
+        prepareAsyncClient(httpMethod, body, null, null, respClass, outType, 
cb);
+        return cb.getCompletionStage();
+    }
+    
+    @Override
+    public void prepareAsyncClient(String httpMethod, 
+                                   Object body,
+                                   Class<?> requestClass,
+                                   Type inType,
+                                   Class<?> respClass,
+                                   Type outType,
+                                   JaxrsClientCallback<?> cb) {
         Annotation[] inAnns = null;
         if (body instanceof Entity) {
             Entity<?> entity = (Entity<?>)body;
@@ -965,28 +951,24 @@ public class WebClient extends AbstractClient {
             requestClass = body.getClass();
             inType = body.getClass();
             inAnns = entity.getAnnotations();
-        }
-        
+        } 
         if (body instanceof GenericEntity) {
             GenericEntity<?> genericEntity = (GenericEntity<?>)body;
             body = genericEntity.getEntity();
             requestClass = genericEntity.getRawType();
             inType = genericEntity.getType();
         }
-        
+      
         MultivaluedMap<String, String> headers = prepareHeaders(respClass, 
body);
         resetResponse();
 
         Message m = finalizeMessage(httpMethod, headers, body, requestClass, 
inType, 
-                                    inAnns, respClass, outType, null, null);
-        
+                                  inAnns, respClass, outType, null, null);
+      
         m.getExchange().setSynchronous(false);
-        JaxrsClientStageCallback<T> cb = new 
JaxrsClientStageCallback<T>(respClass, outType, ex);
         m.getExchange().put(JaxrsClientCallback.class, cb);
-        
+      
         doRunInterceptorChain(m);
-        
-        return cb.getCompletionStage();
     }
 
     
@@ -1288,11 +1270,34 @@ public class WebClient extends AbstractClient {
     
     // Link to JAX-RS 2.1 CompletionStageRxInvoker
     public CompletionStageRxInvoker rx() {
-        return new CompletionStageRxInvokerImpl(null);
+        return rx((ExecutorService)null);
     }
     public CompletionStageRxInvoker rx(ExecutorService ex) {
         return new CompletionStageRxInvokerImpl(ex);
     }
+    // Link to JAX-RS 2.1 RxInvoker extensions
+    @SuppressWarnings("rawtypes")
+    public <T extends RxInvoker> T rx(Class<T> clazz) {
+        return rx(clazz, (ExecutorService)null);
+    }
+    @SuppressWarnings({
+     "rawtypes", "unchecked"
+    })
+    public <T extends RxInvoker> T rx(Class<T> clazz, ExecutorService 
executorService) {
+        if (clazz == CompletionStageRxInvoker.class) {
+            return (T)rx(executorService);
+        } else {
+            String implClassName = clazz.getName() + "Impl";
+            try {
+                Constructor c = ClassLoaderUtils.loadClass(implClassName, 
WebClient.class)
+                    .getConstructor(AsyncClient.class, ExecutorService.class);
+                return (T)c.newInstance(this, executorService);
+            } catch (Throwable t) {
+                throw new ProcessingException(t);
+            }
+        }
+        
+    }
     
     private void setEntityHeaders(Entity<?> entity) {
         type(entity.getMediaType());
@@ -1731,22 +1736,22 @@ public class WebClient extends AbstractClient {
 
         @Override
         public <T> CompletionStage<T> method(String name, Entity<?> entity, 
Class<T> responseType) {
-            return doInvokeAsyncStage(name, entity, null, null, responseType, 
responseType, ex);
+            return doInvokeAsyncStage(name, entity, responseType, 
responseType, ex);
         }
 
         @Override
         public <T> CompletionStage<T> method(String name, Entity<?> entity, 
GenericType<T> responseType) {
-            return doInvokeAsyncStage(name, entity, null, null, 
responseType.getRawType(), responseType.getType(), ex);
+            return doInvokeAsyncStage(name, entity, responseType.getRawType(), 
responseType.getType(), ex);
         }
 
         @Override
         public <T> CompletionStage<T> method(String name, Class<T> 
responseType) {
-            return doInvokeAsyncStage(name, null, null, null, responseType, 
responseType, ex);
+            return doInvokeAsyncStage(name, null, responseType, responseType, 
ex);
         }
 
         @Override
         public <T> CompletionStage<T> method(String name, GenericType<T> 
responseType) {
-            return doInvokeAsyncStage(name, null, null, null, 
responseType.getRawType(), responseType.getType(), ex);
+            return doInvokeAsyncStage(name, null, responseType.getRawType(), 
responseType.getType(), ex);
         }
              
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
index 04980ee..75c9b85 100644
--- 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
+++ 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
@@ -377,7 +377,7 @@ public class InvocationBuilderImpl implements 
Invocation.Builder {
 
     @Override
     public CompletionStageRxInvoker rx() {
-        return webClient.rx();
+        return rx((ExecutorService)null);
     }
 
     @Override
@@ -388,15 +388,13 @@ public class InvocationBuilderImpl implements 
Invocation.Builder {
     @SuppressWarnings("rawtypes")
     @Override
     public <T extends RxInvoker> T rx(Class<T> clazz) {
-        // TODO: Implementation required (JAX-RS 2.1)
-        return null;
+        return rx(clazz, (ExecutorService)null);
     }
 
     @SuppressWarnings("rawtypes")
     @Override
     public <T extends RxInvoker> T rx(Class<T> clazz, ExecutorService 
executorService) {
-        // TODO: Implementation required (JAX-RS 2.1)
-        return null;
+        return webClient.rx(clazz, executorService);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/extensions/rx/pom.xml
----------------------------------------------------------------------
diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/rx/pom.xml
index a65484a..a1207a5 100644
--- a/rt/rs/extensions/rx/pom.xml
+++ b/rt/rs/extensions/rx/pom.xml
@@ -37,6 +37,11 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-client</artifactId>
+            <version>${project.version}</version>
+        </dependency> 
+        <dependency>
           <groupId>io.reactivex</groupId>
           <artifactId>rxjava</artifactId>
           <version>${cxf.rx.java.version}</version>

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java
 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java
new file mode 100644
index 0000000..c40aef0
--- /dev/null
+++ 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.rx.client;
+
+import java.lang.reflect.Type;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+
+import org.apache.cxf.jaxrs.client.JaxrsClientCallback;
+
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+public class JaxrsClientObservableCallback<T> extends JaxrsClientCallback<T> {
+    private Observable<T> observable;
+    
+    public JaxrsClientObservableCallback(Class<?> responseClass, 
+                                  Type outGenericType,
+                                  Executor ex) {
+        super(null, responseClass, outGenericType);
+        Future<T> f = super.createFuture();
+        observable = ex == null ? Observable.from(f) 
+            : Observable.from(f, Schedulers.from(ex));
+    }
+    
+    public Observable<T> getObservable() {
+        return observable;
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvoker.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvoker.java
 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvoker.java
new file mode 100644
index 0000000..3df1931
--- /dev/null
+++ 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvoker.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.client;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.RxInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import rx.Observable;
+
+@SuppressWarnings("rawtypes")
+public interface ObservableRxInvoker extends RxInvoker<Observable> {
+
+    @Override
+    Observable<Response> get();
+
+    @Override
+    <T> Observable<T> get(Class<T> responseType);
+
+    @Override
+    <T> Observable<T> get(GenericType<T> responseType);
+
+    @Override
+    Observable<Response> put(Entity<?> entity);
+
+    @Override
+    <T> Observable<T> put(Entity<?> entity, Class<T> clazz);
+
+    @Override
+    <T> Observable<T> put(Entity<?> entity, GenericType<T> type);
+
+    @Override
+    Observable<Response> post(Entity<?> entity);
+
+    @Override
+    <T> Observable<T> post(Entity<?> entity, Class<T> clazz);
+
+    @Override
+    <T> Observable<T> post(Entity<?> entity, GenericType<T> type);
+
+    @Override
+    Observable<Response> delete();
+
+    @Override
+    <T> Observable<T> delete(Class<T> responseType);
+
+    @Override
+    <T> Observable<T> delete(GenericType<T> responseType);
+
+    @Override
+    Observable<Response> head();
+
+    @Override
+    Observable<Response> options();
+
+    @Override
+    <T> Observable<T> options(Class<T> responseType);
+
+    @Override
+    <T> Observable<T> options(GenericType<T> responseType);
+
+    @Override
+    Observable<Response> trace();
+
+    @Override
+    <T> Observable<T> trace(Class<T> responseType);
+
+    @Override
+    <T> Observable<T> trace(GenericType<T> responseType);
+
+    @Override
+    Observable<Response> method(String name);
+
+    @Override
+    <T> Observable<T> method(String name, Class<T> responseType);
+
+    @Override
+    <T> Observable<T> method(String name, GenericType<T> responseType);
+
+    @Override
+    Observable<Response> method(String name, Entity<?> entity);
+
+    @Override
+    <T> Observable<T> method(String name, Entity<?> entity, Class<T> 
responseType);
+
+    @Override
+    <T> Observable<T> method(String name, Entity<?> entity, GenericType<T> 
responseType);
+}
+

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
new file mode 100644
index 0000000..222994a
--- /dev/null
+++ 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx.client;
+
+import java.lang.reflect.Type;
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import org.apache.cxf.jaxrs.client.AsyncClient;
+
+import rx.Observable;
+
+public class ObservableRxInvokerImpl implements ObservableRxInvoker {
+    private ExecutorService ex;
+    private AsyncClient wc;
+    public ObservableRxInvokerImpl(AsyncClient wc, ExecutorService ex) {
+        this.wc = wc;
+        this.ex = ex;
+    }
+    
+    @Override
+    public Observable<Response> get() {
+        return get(Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> get(Class<T> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> get(GenericType<T> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public Observable<Response> put(Entity<?> entity) {
+        return put(entity, Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> put(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> put(Entity<?> entity, GenericType<T> 
responseType) {
+        return method(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public Observable<Response> post(Entity<?> entity) {
+        return post(entity, Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> post(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> post(Entity<?> entity, GenericType<T> 
responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public Observable<Response> delete() {
+        return delete(Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> delete(Class<T> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> delete(GenericType<T> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public Observable<Response> head() {
+        return method(HttpMethod.HEAD);
+    }
+
+    @Override
+    public Observable<Response> options() {
+        return options(Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> options(Class<T> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> options(GenericType<T> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public Observable<Response> trace() {
+        return trace(Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> trace(Class<T> responseType) {
+        return method("TRACE", responseType);
+    }
+
+    @Override
+    public <T> Observable<T> trace(GenericType<T> responseType) {
+        return method("TRACE", responseType);
+    }
+
+    @Override
+    public Observable<Response> method(String name) {
+        return method(name, Response.class);
+    }
+
+    @Override
+    public Observable<Response> method(String name, Entity<?> entity) {
+        return method(name, entity, Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> method(String name, Entity<?> entity, Class<T> 
responseType) {
+        return doInvokeAsync(name, entity, responseType, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> method(String name, Entity<?> entity, 
GenericType<T> responseType) {
+        return doInvokeAsync(name, entity, responseType.getRawType(), 
responseType.getType());
+    }
+
+    @Override
+    public <T> Observable<T> method(String name, Class<T> responseType) {
+        return doInvokeAsync(name, null, responseType, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> method(String name, GenericType<T> responseType) {
+        return doInvokeAsync(name, null, responseType.getRawType(), 
responseType.getType());
+    }
+    
+    protected <T> Observable<T> doInvokeAsync(String httpMethod, 
+                                              Object body, 
+                                              Class<?> respClass,
+                                              Type outType) {
+        JaxrsClientObservableCallback<T> cb = new 
JaxrsClientObservableCallback<T>(respClass, outType, ex);
+        wc.prepareAsyncClient(httpMethod, body, null, null, respClass, 
outType, cb);
+        return cb.getObservable();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/2d1da35c/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
----------------------------------------------------------------------
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
index 59da436..10ce5ea 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java
@@ -21,14 +21,15 @@ package org.apache.cxf.systest.jaxrs.reactive;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Future;
 
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.core.GenericType;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 
 import org.apache.cxf.jaxrs.client.WebClient;
 import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker;
 import org.apache.cxf.jaxrs.rx.provider.ObservableReader;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 
@@ -73,15 +74,6 @@ public class JAXRSReactiveTest extends 
AbstractBusClientServerTestBase {
         obs.subscribe(s -> assertResponse(s));
     }
     
-    @Test
-    public void testGetHelloWorldAsyncObservable() throws Exception {
-        String address = "http://localhost:"; + PORT + "/reactive/textAsync";
-        WebClient wc = WebClient.create(address);
-        Observable<String> obs = 
-            getObservable(wc.accept("text/plain").async().get(String.class));
-        obs.subscribe(s -> assertResponse(s));
-    }
-    
     private void assertResponse(String s) {
         assertEquals("Hello, world!", s);
     }
@@ -129,7 +121,34 @@ public class JAXRSReactiveTest extends 
AbstractBusClientServerTestBase {
         assertEquals("World", beans.get(1).getAudience());
     }
     
-    private Observable<String> getObservable(Future<String> future) {
-        return Observable.from(future);
+    @Test
+    public void testGetHelloWorldAsyncObservable() throws Exception {
+        String address = "http://localhost:"; + PORT + "/reactive/textAsync";
+        WebClient wc = WebClient.create(address);
+        Observable<String> obs = wc.accept("text/plain")
+            .rx(ObservableRxInvoker.class)
+            .get(String.class);
+        obs.map(s -> { 
+            return s + s; 
+        });
+        
+        Thread.sleep(3000);
+        
+        obs.subscribe(s -> assertDuplicateResponse(s));
+    }
+    @Test
+    public void testGetHelloWorldAsyncObservable404() throws Exception {
+        String address = "http://localhost:"; + PORT + "/reactive/textAsync404";
+        WebClient wc = WebClient.create(address);
+        try {
+            wc.rx(ObservableRxInvoker.class).get(String.class).subscribe(s -> 
System.out.println());
+            fail("Exception expected");
+        } catch (Throwable ex) {
+            assertTrue(ex.getCause().getCause() instanceof NotFoundException);
+        }
+    }
+    
+    private void assertDuplicateResponse(String s) {
+        assertEquals("Hello, world!Hello, world!", s);
     }
 }

Reply via email to