reta closed pull request #451: CXF-7854: Refactor RxJava2 Flowable and 
Observable Rx Invokers to not…
URL: https://github.com/apache/cxf/pull/451
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
 
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
index be5c2d54eef..f34ba566ce4 100644
--- 
a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
+++ 
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerImpl.java
@@ -19,24 +19,28 @@
 package org.apache.cxf.jaxrs.rx2.client;
 
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.SyncInvoker;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.Response;
 
-import org.apache.cxf.jaxrs.client.WebClient;
-
+import io.reactivex.BackpressureStrategy;
 import io.reactivex.Flowable;
+import io.reactivex.FlowableEmitter;
+import io.reactivex.FlowableOnSubscribe;
 import io.reactivex.Scheduler;
 import io.reactivex.schedulers.Schedulers;
 
 
 public class FlowableRxInvokerImpl implements FlowableRxInvoker {
     private Scheduler sc;
-    private WebClient wc;
-    public FlowableRxInvokerImpl(WebClient wc, ExecutorService ex) {
-        this.wc = wc;
+    private SyncInvoker syncInvoker;
+    
+    public FlowableRxInvokerImpl(SyncInvoker syncInvoker, ExecutorService ex) {
+        this.syncInvoker = syncInvoker;
         this.sc = ex == null ? null : Schedulers.from(ex);
     }
 
@@ -147,34 +151,50 @@ public FlowableRxInvokerImpl(WebClient wc, 
ExecutorService ex) {
 
     @Override
     public <T> Flowable<T> method(String name, Entity<?> entity, Class<T> 
responseType) {
-        if (sc == null) {
-            return Flowable.fromFuture(wc.async().method(name, entity, 
responseType));
-        }
-        return Flowable.fromFuture(wc.async().method(name, entity, 
responseType), sc);
+        return create(() -> syncInvoker.method(name, entity, responseType));
     }
-
+    
     @Override
     public <T> Flowable<T> method(String name, Entity<?> entity, 
GenericType<T> responseType) {
-        if (sc == null) {
-            return Flowable.fromFuture(wc.async().method(name, entity, 
responseType));
-        }
-        return Flowable.fromFuture(wc.async().method(name, entity, 
responseType), sc);
+        return create(() -> syncInvoker.method(name, entity, responseType));
     }
 
     @Override
     public <T> Flowable<T> method(String name, Class<T> responseType) {
-        if (sc == null) {
-            return Flowable.fromFuture(wc.async().method(name, responseType));
-        }
-        return Flowable.fromFuture(wc.async().method(name, responseType), sc);
+        return create(() -> syncInvoker.method(name, responseType));
     }
 
     @Override
     public <T> Flowable<T> method(String name, GenericType<T> responseType) {
+        return create(() -> syncInvoker.method(name, responseType));
+    }
+    
+    private <T> Flowable<T> create(Supplier<T> supplier) {
+        Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
+            @Override
+            public void subscribe(FlowableEmitter<T> emitter) throws Exception 
{
+                try {
+                    T response = supplier.get();
+                    if (!emitter.isCancelled()) {
+                        emitter.onNext(response);
+                    }
+                    
+                    if (!emitter.isCancelled()) {
+                        emitter.onComplete();
+                    }
+                } catch (Throwable e) {
+                    if (!emitter.isCancelled()) {
+                        emitter.onError(e);
+                    }
+                }
+            }
+        }, BackpressureStrategy.DROP);
+        
         if (sc == null) {
-            return Flowable.fromFuture(wc.async().method(name, responseType));
+            return flowable.subscribeOn(Schedulers.io());
         }
-        return Flowable.fromFuture(wc.async().method(name, responseType), sc);
+        
+        return flowable.subscribeOn(sc).observeOn(sc);
     }
 
 }
diff --git 
a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
 
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
index e4e0e71196d..6b61b517ae7 100644
--- 
a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
+++ 
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/FlowableRxInvokerProvider.java
@@ -24,17 +24,12 @@
 import javax.ws.rs.client.SyncInvoker;
 import javax.ws.rs.ext.Provider;
 
-import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
-
 @Provider
 public class FlowableRxInvokerProvider implements 
RxInvokerProvider<FlowableRxInvoker> {
 
     @Override
     public FlowableRxInvoker getRxInvoker(SyncInvoker syncInvoker, 
ExecutorService executorService) {
-        // TODO: At the moment we still delegate if possible to the async HTTP 
conduit.
-        // Investigate if letting the RxJava thread pool deal with the sync 
invocation
-        // is indeed more effective
-        return new 
FlowableRxInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), 
executorService);
+        return new FlowableRxInvokerImpl(syncInvoker, executorService);
     }
 
     @Override
diff --git 
a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
 
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
index 2c1f966a325..1cb1b9f5add 100644
--- 
a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
+++ 
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java
@@ -19,24 +19,26 @@
 package org.apache.cxf.jaxrs.rx2.client;
 
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 import javax.ws.rs.HttpMethod;
 import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.SyncInvoker;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.Response;
 
-import org.apache.cxf.jaxrs.client.WebClient;
-
 import io.reactivex.Observable;
+import io.reactivex.ObservableEmitter;
+import io.reactivex.ObservableOnSubscribe;
 import io.reactivex.Scheduler;
 import io.reactivex.schedulers.Schedulers;
 
 
 public class ObservableRxInvokerImpl implements ObservableRxInvoker {
     private Scheduler sc;
-    private WebClient wc;
-    public ObservableRxInvokerImpl(WebClient wc, ExecutorService ex) {
-        this.wc = wc;
+    private SyncInvoker syncInvoker;
+    public ObservableRxInvokerImpl(SyncInvoker syncInvoker, ExecutorService 
ex) {
+        this.syncInvoker = syncInvoker;
         this.sc = ex == null ? null : Schedulers.from(ex);
     }
 
@@ -147,34 +149,50 @@ public ObservableRxInvokerImpl(WebClient wc, 
ExecutorService ex) {
 
     @Override
     public <T> Observable<T> method(String name, Entity<?> entity, Class<T> 
responseType) {
-        if (sc == null) {
-            return Observable.fromFuture(wc.async().method(name, entity, 
responseType));
-        }
-        return Observable.fromFuture(wc.async().method(name, entity, 
responseType), sc);
+        return create(() -> syncInvoker.method(name, entity, responseType));
     }
-
+    
     @Override
     public <T> Observable<T> method(String name, Entity<?> entity, 
GenericType<T> responseType) {
-        if (sc == null) {
-            return Observable.fromFuture(wc.async().method(name, entity, 
responseType));
-        }
-        return Observable.fromFuture(wc.async().method(name, entity, 
responseType), sc);
+        return create(() -> syncInvoker.method(name, entity, responseType));
     }
 
     @Override
     public <T> Observable<T> method(String name, Class<T> responseType) {
-        if (sc == null) {
-            return Observable.fromFuture(wc.async().method(name, 
responseType));
-        }
-        return Observable.fromFuture(wc.async().method(name, responseType), 
sc);
+        return create(() -> syncInvoker.method(name, responseType));
     }
 
     @Override
     public <T> Observable<T> method(String name, GenericType<T> responseType) {
+        return create(() -> syncInvoker.method(name, responseType));
+    }
+    
+    private <T> Observable<T> create(Supplier<T> supplier) {
+        Observable<T> observable = Observable.create(new 
ObservableOnSubscribe<T>() {
+            @Override
+            public void subscribe(ObservableEmitter<T> emitter) throws 
Exception {
+                try {
+                    T response = supplier.get();
+                    if (!emitter.isDisposed()) {
+                        emitter.onNext(response);
+                    }
+                    
+                    if (!emitter.isDisposed()) {
+                        emitter.onComplete();
+                    }
+                } catch (Throwable e) {
+                    if (!emitter.isDisposed()) {
+                        emitter.onError(e);
+                    }
+                }
+            }
+        });
+        
         if (sc == null) {
-            return Observable.fromFuture(wc.async().method(name, 
responseType));
+            return observable.subscribeOn(Schedulers.io());
         }
-        return Observable.fromFuture(wc.async().method(name, responseType), 
sc);
+        
+        return observable.subscribeOn(sc).observeOn(sc);
     }
 
 }
diff --git 
a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
 
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
index 221bc481595..5ab701c400d 100644
--- 
a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
+++ 
b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java
@@ -24,17 +24,12 @@
 import javax.ws.rs.client.SyncInvoker;
 import javax.ws.rs.ext.Provider;
 
-import org.apache.cxf.jaxrs.client.SyncInvokerImpl;
-
 @Provider
 public class ObservableRxInvokerProvider implements 
RxInvokerProvider<ObservableRxInvoker> {
 
     @Override
     public ObservableRxInvoker getRxInvoker(SyncInvoker syncInvoker, 
ExecutorService executorService) {
-        // TODO: At the moment we still delegate if possible to the async HTTP 
conduit.
-        // Investigate if letting the RxJava thread pool deal with the sync 
invocation
-        // is indeed more effective
-        return new 
ObservableRxInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), 
executorService);
+        return new ObservableRxInvokerImpl(syncInvoker, executorService);
     }
 
     @Override
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
index dc8ef134b64..081a518e691 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2ObservableTest.java
@@ -19,9 +19,11 @@
 
 package org.apache.cxf.systest.jaxrs.reactive;
 
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
+import javax.ws.rs.core.GenericType;
 import javax.xml.ws.Holder;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
@@ -48,6 +50,14 @@ public static void startServers() throws Exception {
                    launchServer(RxJava2ObservableServer.class, true));
         createStaticBus();
     }
+    @Test
+    public void testGetHelloWorldText() throws Exception {
+        String address = "http://localhost:"; + PORT + "/rx2/observable/text";
+        WebClient wc = WebClient.create(address);
+        String text = wc.accept("text/plain").get(String.class);
+        assertEquals("Hello, world!", text);
+    }
+
     @Test
     public void testGetHelloWorldJson() throws Exception {
         String address = "http://localhost:"; + PORT + 
"/rx2/observable/textJson";
@@ -70,4 +80,25 @@ public void testGetHelloWorldJson() throws Exception {
         assertEquals("Hello", holder.value.getGreeting());
         assertEquals("World", holder.value.getAudience());
     }
+    
+    @Test
+    public void testGetHelloWorldJsonList() throws Exception {
+        String address = "http://localhost:"; + PORT + 
"/rx2/observable/textJsonList";
+        doTestGetHelloWorldJsonList(address);
+    } 
+    
+    private void doTestGetHelloWorldJsonList(String address) throws Exception {
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new 
JacksonJsonProvider()));
+        
WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+        GenericType<List<HelloWorldBean>> genericResponseType = new 
GenericType<List<HelloWorldBean>>() {
+        };
+
+        List<HelloWorldBean> beans = 
wc.accept("application/json").get(genericResponseType);
+        assertEquals(2, beans.size());
+        assertEquals("Hello", beans.get(0).getGreeting());
+        assertEquals("World", beans.get(0).getAudience());
+        assertEquals("Ciao", beans.get(1).getGreeting());
+        assertEquals("World", beans.get(1).getAudience());
+    }
 }
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
index abb6e7289bf..f08147d1638 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java
@@ -19,6 +19,8 @@
 
 package org.apache.cxf.systest.jaxrs.reactive;
 
+import java.util.Arrays;
+import java.util.List;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -26,10 +28,15 @@
 
 import io.reactivex.Observable;
 
-
 @Path("/rx2/observable")
 public class RxJava2ObservableService {
 
+    @GET
+    @Produces("text/plain")
+    @Path("text")
+    public Observable<String> getText() {
+        return Observable.just("Hello, world!");
+    }
     
     @GET
     @Produces("application/json")
@@ -37,6 +44,15 @@
     public Observable<HelloWorldBean> getJson() {
         return Observable.just(new HelloWorldBean());
     }
+    
+    @GET
+    @Produces("application/json")
+    @Path("textJsonList")
+    public Observable<List<HelloWorldBean>> getJsonList() {
+        HelloWorldBean bean1 = new HelloWorldBean();
+        HelloWorldBean bean2 = new HelloWorldBean();
+        bean2.setGreeting("Ciao");
+        return Observable.just(Arrays.asList(bean1, bean2));
+    }
+  
 }
-
-


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to