This is an automated email from the ASF dual-hosted git repository. sergeyb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 169427ac954fc606329eac7f18fccd6acff98a07 Author: Sergey Beryozkin <[email protected]> AuthorDate: Fri Dec 22 15:32:34 2017 +0000 [CXF-7535] Preparing ReactorInvoker to handle Flowable sequence --- .../cxf/jaxrs/reactor/server/ReactorInvoker.java | 27 ++++++++++++---------- .../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java | 18 +++++++++++++-- .../jaxrs/reactive/JAXRSRxJava2FlowableTest.java | 11 +++++++++ .../jaxrs/reactive/RxJava2FlowableService.java | 16 ++++++++++++- .../cxf/systest/jaxrs/reactor/FluxReactorTest.java | 3 +++ .../cxf/systest/jaxrs/reactor/FluxService.java | 9 ++++++++ 6 files changed, 69 insertions(+), 15 deletions(-) diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java index 6779e04..79b0428 100644 --- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java +++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java @@ -18,10 +18,12 @@ */ package org.apache.cxf.jaxrs.reactor.server; -import java.util.function.Consumer; +import java.util.concurrent.CancellationException; + import org.apache.cxf.jaxrs.JAXRSInvoker; import org.apache.cxf.jaxrs.impl.AsyncResponseImpl; import org.apache.cxf.message.Message; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -32,25 +34,26 @@ public class ReactorInvoker extends JAXRSInvoker { final Flux<?> flux = (Flux<?>) result; final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); flux.doOnNext(asyncResponse::resume) - .doOnError(asyncResponse::resume) - .doOnComplete(asyncResponse::onComplete) + .doOnError(t -> handleThrowable(asyncResponse, t)) .subscribe(); return asyncResponse; } else if (result instanceof Mono) { - // mono is only 0 or 1 element, so when something comes in need to complete the async final Mono<?> flux = (Mono<?>) result; final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); - flux.doOnNext((Consumer<Object>) o -> { - asyncResponse.resume(o); - asyncResponse.onComplete(); - }) - .doOnError((Consumer<Throwable>) throwable -> { - asyncResponse.resume(throwable); - asyncResponse.onComplete(); - }) + flux.doOnNext(asyncResponse::resume) + .doOnError(t -> handleThrowable(asyncResponse, t)) .subscribe(); return asyncResponse; } return null; } + + private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) { + if (t instanceof CancellationException) { + asyncResponse.cancel(); + } else { + asyncResponse.resume(t); + } + return null; + } } diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java index 44ac9c7..6c99a2c 100644 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java @@ -18,6 +18,8 @@ */ package org.apache.cxf.jaxrs.rx2.server; +import java.util.concurrent.CancellationException; + import javax.ws.rs.core.MediaType; import org.apache.cxf.jaxrs.JAXRSInvoker; @@ -27,6 +29,7 @@ import org.apache.cxf.message.Message; import io.reactivex.Flowable; import io.reactivex.Observable; +import io.reactivex.Single; //Work in Progress public class ReactiveIOInvoker extends JAXRSInvoker { @@ -34,6 +37,8 @@ public class ReactiveIOInvoker extends JAXRSInvoker { protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) { if (result instanceof Flowable) { return handleFlowable(inMessage, (Flowable<?>)result); + } else if (result instanceof Single) { + return handleSingle(inMessage, (Single<?>)result); } else if (result instanceof Observable) { return handleObservable(inMessage, (Observable<?>)result); } else { @@ -41,6 +46,12 @@ public class ReactiveIOInvoker extends JAXRSInvoker { } } + protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) { + final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); + single.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t)); + return asyncResponse; + } + protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) { final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); if (isUseStreamingSubscriberIfPossible() && isJsonResponse(inMessage)) { @@ -62,8 +73,11 @@ public class ReactiveIOInvoker extends JAXRSInvoker { } private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) { - //TODO: if it is a Cancelation exception => asyncResponse.cancel(); - asyncResponse.resume(t); + if (t instanceof CancellationException) { + asyncResponse.cancel(); + } else { + asyncResponse.resume(t); + } return null; } diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java index 479f95b..ddfa9f4 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java @@ -111,6 +111,17 @@ public class JAXRSRxJava2FlowableTest extends AbstractBusClientServerTestBase { } @Test + public void testGetHelloWorldJsonSingle() throws Exception { + String address = "http://localhost:" + PORT + "/rx22/flowable/textJsonSingle"; + WebClient wc = WebClient.create(address, + Collections.singletonList(new JacksonJsonProvider())); + + HelloWorldBean bean = wc.accept("application/json").get(HelloWorldBean.class); + assertEquals("Hello", bean.getGreeting()); + assertEquals("World", bean.getAudience()); + } + + @Test public void testGetHelloWorldAsyncObservable() throws Exception { String address = "http://localhost:" + PORT + "/rx2/flowable/textAsync"; WebClient wc = WebClient.create(address, diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java index d4be1ea..b89f368 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java @@ -22,6 +22,7 @@ package org.apache.cxf.systest.jaxrs.reactive; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CompletableFuture; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -34,6 +35,7 @@ import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; +import io.reactivex.Single; import io.reactivex.schedulers.Schedulers; @@ -71,7 +73,7 @@ public class RxJava2FlowableService { @Path("textJsonImplicitListAsyncStream") public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) { Flowable.just("Hello", "Ciao") - .map(s -> new HelloWorldBean(s)) + .map(HelloWorldBean::new) .subscribeOn(Schedulers.computation()) .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar)); } @@ -92,6 +94,18 @@ public class RxJava2FlowableService { }, BackpressureStrategy.MISSING); } + @GET + @Produces("application/json") + @Path("textJsonSingle") + public Single<HelloWorldBean> getJsonSingle() { + CompletableFuture<HelloWorldBean> completableFuture = CompletableFuture + .supplyAsync(() -> { + sleep(); + return new HelloWorldBean("Hello"); + }); + return Single.fromFuture(completableFuture); + } + private static void sleep() { try { Thread.sleep(1000); diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java index f6ac2ac..3065dd5 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java @@ -62,6 +62,9 @@ public class FluxReactorTest extends AbstractBusClientServerTestBase { @Test public void testTextJsonImplicitListAsyncStream() throws Exception { String address = "http://localhost:" + PORT + "/reactor/flux/textJsonImplicitListAsyncStream"; + doTestTextJsonImplicitListAsyncStream(address); + } + private void doTestTextJsonImplicitListAsyncStream(String address) throws Exception { List<HelloWorldBean> holder = new ArrayList<>(); ClientBuilder.newClient() .register(new JacksonJsonProvider()) diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java index 79a6c7f..d96d713 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java @@ -47,4 +47,13 @@ public class FluxService { .subscribeOn(Schedulers.parallel()) .subscribe(new JsonStreamingAsyncSubscriber<>(ar)); } + + @GET + @Produces("application/json") + @Path("textJsonImplicitListAsyncStream2") + public Flux<HelloWorldBean> getJsonImplicitListStreamingAsync2() { + return Flux.just("Hello", "Ciao") + .map(HelloWorldBean::new) + .subscribeOn(Schedulers.parallel()); + } } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
