This is an automated email from the ASF dual-hosted git repository.

sergeyb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 8efd3625123ea0eb9c864d2064757756c9290ad7
Author: Sergey Beryozkin <[email protected]>
AuthorDate: Fri Dec 22 17:33:11 2017 +0000

    [CXF-7535] Introducing the common invoker
---
 .../server/AbstractReactiveInvoker.java}           | 44 ++++++++++------------
 rt/rs/extensions/reactor/pom.xml                   |  7 ++++
 .../cxf/jaxrs/reactor/server/ReactorInvoker.java   | 26 ++++++-------
 .../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java    | 30 +--------------
 .../cxf/systest/jaxrs/reactor/FluxReactorTest.java |  5 +++
 .../cxf/systest/jaxrs/reactor/FluxService.java     |  2 +-
 .../cxf/systest/jaxrs/reactor/MonoService.java     |  2 +-
 .../cxf/systest/jaxrs/reactor/ReactorServer.java   | 34 +++++++++++++----
 8 files changed, 74 insertions(+), 76 deletions(-)

diff --git 
a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
 
b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/AbstractReactiveInvoker.java
similarity index 51%
copy from 
rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
copy to 
rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/AbstractReactiveInvoker.java
index 79b0428..237a4dc 100644
--- 
a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
+++ 
b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/AbstractReactiveInvoker.java
@@ -16,39 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.cxf.jaxrs.reactor.server;
+package org.apache.cxf.jaxrs.reactivestreams.server;
 
 import java.util.concurrent.CancellationException;
 
+import javax.ws.rs.core.MediaType;
+
 import org.apache.cxf.jaxrs.JAXRSInvoker;
 import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.message.Message;
 
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 
-public class ReactorInvoker extends JAXRSInvoker {
-    @Override
-    protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object 
result) {
-        if (result instanceof Flux) {
-            final Flux<?> flux = (Flux<?>) result;
-            final AsyncResponseImpl asyncResponse = new 
AsyncResponseImpl(inMessage);
-            flux.doOnNext(asyncResponse::resume)
-                    .doOnError(t -> handleThrowable(asyncResponse, t))
-                    .subscribe();
-            return asyncResponse;
-        } else if (result instanceof Mono) {
-            final Mono<?> flux = (Mono<?>) result;
-            final AsyncResponseImpl asyncResponse = new 
AsyncResponseImpl(inMessage);
-            flux.doOnNext(asyncResponse::resume)
-                .doOnError(t -> handleThrowable(asyncResponse, t))
-                .subscribe();
-            return asyncResponse;
-        }
-        return null;
-    }
+public abstract class AbstractReactiveInvoker extends JAXRSInvoker {
+    private boolean useStreamingSubscriberIfPossible;
+    
     
-    private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable 
t) {
+    protected Object handleThrowable(AsyncResponseImpl asyncResponse, 
Throwable t) {
         if (t instanceof CancellationException) {
             asyncResponse.cancel();
         } else {
@@ -56,4 +39,17 @@ public class ReactorInvoker extends JAXRSInvoker {
         }
         return null;
     }
+    
+    protected boolean isJsonResponse(Message inMessage) {
+        return 
MediaType.APPLICATION_JSON.equals(inMessage.getExchange().get(Message.CONTENT_TYPE));
+    }
+
+    
+    public boolean isUseStreamingSubscriberIfPossible() {
+        return useStreamingSubscriberIfPossible;
+    }
+
+    public void setUseStreamingSubscriberIfPossible(boolean 
useStreamingSubscriberIfPossible) {
+        this.useStreamingSubscriberIfPossible = 
useStreamingSubscriberIfPossible;
+    }
 }
diff --git a/rt/rs/extensions/reactor/pom.xml b/rt/rs/extensions/reactor/pom.xml
index 40de637..952f73b 100644
--- a/rt/rs/extensions/reactor/pom.xml
+++ b/rt/rs/extensions/reactor/pom.xml
@@ -43,6 +43,13 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-extension-reactivestreams</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
             <groupId>io.projectreactor</groupId>
             <artifactId>reactor-core</artifactId>
         </dependency>
diff --git 
a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
 
b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
index 79b0428..9e204a0 100644
--- 
a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
+++ 
b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
@@ -18,29 +18,33 @@
  */
 package org.apache.cxf.jaxrs.reactor.server;
 
-import java.util.concurrent.CancellationException;
-
-import org.apache.cxf.jaxrs.JAXRSInvoker;
 import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
+import 
org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
 import org.apache.cxf.message.Message;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-public class ReactorInvoker extends JAXRSInvoker {
+public class ReactorInvoker extends AbstractReactiveInvoker {
+    
     @Override
     protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object 
result) {
         if (result instanceof Flux) {
             final Flux<?> flux = (Flux<?>) result;
             final AsyncResponseImpl asyncResponse = new 
AsyncResponseImpl(inMessage);
-            flux.doOnNext(asyncResponse::resume)
+            if (isUseStreamingSubscriberIfPossible() && 
isJsonResponse(inMessage)) {
+                flux.subscribe(new 
JsonStreamingAsyncSubscriber<>(asyncResponse));
+            } else {
+                flux.doOnNext(asyncResponse::resume)
                     .doOnError(t -> handleThrowable(asyncResponse, t))
                     .subscribe();
+            }
             return asyncResponse;
         } else if (result instanceof Mono) {
-            final Mono<?> flux = (Mono<?>) result;
+            final Mono<?> mono = (Mono<?>) result;
             final AsyncResponseImpl asyncResponse = new 
AsyncResponseImpl(inMessage);
-            flux.doOnNext(asyncResponse::resume)
+            mono.doOnNext(asyncResponse::resume)
                 .doOnError(t -> handleThrowable(asyncResponse, t))
                 .subscribe();
             return asyncResponse;
@@ -48,12 +52,4 @@ public class ReactorInvoker extends JAXRSInvoker {
         return null;
     }
     
-    private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable 
t) {
-        if (t instanceof CancellationException) {
-            asyncResponse.cancel();
-        } else {
-            asyncResponse.resume(t);
-        }
-        return null;
-    }
 }
diff --git 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
index 6c99a2c..e113d40 100644
--- 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
+++ 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
@@ -18,12 +18,8 @@
  */
 package org.apache.cxf.jaxrs.rx2.server;
 
-import java.util.concurrent.CancellationException;
-
-import javax.ws.rs.core.MediaType;
-
-import org.apache.cxf.jaxrs.JAXRSInvoker;
 import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
 import 
org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
 import org.apache.cxf.message.Message;
 
@@ -31,9 +27,7 @@ import io.reactivex.Flowable;
 import io.reactivex.Observable;
 import io.reactivex.Single;
 
-//Work in Progress
-public class ReactiveIOInvoker extends JAXRSInvoker {
-    private boolean useStreamingSubscriberIfPossible;
+public class ReactiveIOInvoker extends AbstractReactiveInvoker {
     protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object 
result) {
         if (result instanceof Flowable) {
             return handleFlowable(inMessage, (Flowable<?>)result);
@@ -62,30 +56,10 @@ public class ReactiveIOInvoker extends JAXRSInvoker {
         return asyncResponse;
     }
     
-    protected boolean isJsonResponse(Message inMessage) {
-        return 
MediaType.APPLICATION_JSON.equals(inMessage.getExchange().get(Message.CONTENT_TYPE));
-    }
-
     protected AsyncResponseImpl handleObservable(Message inMessage, 
Observable<?> obs) {
         final AsyncResponseImpl asyncResponse = new 
AsyncResponseImpl(inMessage);
         obs.subscribe(v -> asyncResponse.resume(v), t -> 
handleThrowable(asyncResponse, t));
         return asyncResponse;
     }
 
-    private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable 
t) {
-        if (t instanceof CancellationException) {
-            asyncResponse.cancel();
-        } else {
-            asyncResponse.resume(t);
-        }
-        return null;
-    }
-
-    public boolean isUseStreamingSubscriberIfPossible() {
-        return useStreamingSubscriberIfPossible;
-    }
-
-    public void setUseStreamingSubscriberIfPossible(boolean 
useStreamingSubscriberIfPossible) {
-        this.useStreamingSubscriberIfPossible = 
useStreamingSubscriberIfPossible;
-    }
 }
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
index 3065dd5..46a83b1 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
@@ -64,6 +64,11 @@ public class FluxReactorTest extends 
AbstractBusClientServerTestBase {
         String address = "http://localhost:"; + PORT + 
"/reactor/flux/textJsonImplicitListAsyncStream";
         doTestTextJsonImplicitListAsyncStream(address);
     }
+    @Test
+    public void testTextJsonImplicitListAsyncStream2() throws Exception {
+        String address = "http://localhost:"; + PORT + 
"/reactor2/flux/textJsonImplicitListAsyncStream2";
+        doTestTextJsonImplicitListAsyncStream(address);
+    }
     private void doTestTextJsonImplicitListAsyncStream(String address) throws 
Exception {
         List<HelloWorldBean> holder = new ArrayList<>();
         ClientBuilder.newClient()
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
index d96d713..fb8d12b 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
@@ -28,7 +28,7 @@ import 
org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
 import reactor.core.publisher.Flux;
 import reactor.core.scheduler.Schedulers;
 
-@Path("/reactor/flux")
+@Path("/flux")
 public class FluxService {
 
     @GET
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
index 8ded540..1eab9f4 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java
@@ -29,7 +29,7 @@ import 
org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
-@Path("/reactor/mono")
+@Path("/mono")
 public class MonoService {
 
     @GET
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
index 831d903..c056dbc 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java
@@ -33,8 +33,9 @@ import 
org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 public class ReactorServer extends AbstractBusTestServerBase {
     public static final String PORT = allocatePort(ReactorServer.class);
 
-    org.apache.cxf.endpoint.Server server;
-
+    org.apache.cxf.endpoint.Server server1;
+    org.apache.cxf.endpoint.Server server2;
+    
     @Override
     protected void run() {
         Bus bus = BusFactory.getDefaultBus();
@@ -52,15 +53,34 @@ public class ReactorServer extends 
AbstractBusTestServerBase {
                 new SingletonResourceProvider(new FluxService(), true));
         sf.setResourceProvider(MonoService.class,
                 new SingletonResourceProvider(new MonoService(), true));
-        sf.setAddress("http://localhost:"; + PORT + "/");
-        server = sf.create();
+        sf.setAddress("http://localhost:"; + PORT + "/reactor");
+        server1 = sf.create();
+        
+        JAXRSServerFactoryBean sf2 = new JAXRSServerFactoryBean();
+        ReactorInvoker invoker2 = new ReactorInvoker();
+        invoker2.setUseStreamingSubscriberIfPossible(true);
+        sf2.setInvoker(invoker2);
+        StreamingResponseProvider<HelloWorldBean> streamProvider2 = new 
StreamingResponseProvider<HelloWorldBean>();
+        
streamProvider2.setProduceMediaTypes(Collections.singletonList("application/json"));
+        sf2.setProvider(streamProvider2);
+        sf2.setProvider(new JacksonJsonProvider());
+        sf2.getOutInterceptors().add(new LoggingOutInterceptor());
+        sf2.setResourceClasses(FluxService.class);
+        sf2.setResourceProvider(FluxService.class,
+                new SingletonResourceProvider(new FluxService(), true));
+        sf2.setAddress("http://localhost:"; + PORT + "/reactor2");
+        server2 = sf2.create();
     }
 
     @Override
     public void tearDown() throws Exception {
-        server.stop();
-        server.destroy();
-        server = null;
+        server1.stop();
+        server1.destroy();
+        server1 = null;
+        
+        server2.stop();
+        server2.destroy();
+        server2 = null;
     }
 
     public static void main(String[] args) {

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to