This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.3.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit ec0ad05e9720d8b4b893fc4ee2ca8ca1ed30cf7d Author: reta <[email protected]> AuthorDate: Sat Oct 10 18:22:38 2020 -0400 CXF-8355: RxJava3: Observable / Flowable Returns Mixed Response on Errors and Hangs when Empty (cherry picked from commit 5fded377430dcfd461cf8714ec893cbf18f606a4) --- .../cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java | 2 + .../cxf/jaxrs/rx3/server/ReactiveIOInvoker.java | 17 +- .../reactive/IllegalArgumentExceptionMapper.java | 22 +-- .../reactive/IllegalStateExceptionMapper.java | 25 ++- .../jaxrs/reactive/JAXRSRxJava3FlowableTest.java | 183 +++++++++++++++++++++ .../jaxrs/reactive/JAXRSRxJava3ObservableTest.java | 70 ++++++++ .../jaxrs/reactive/JAXRSRxJava3SingleTest.java | 113 +++++++++++++ .../jaxrs/reactive/RxJava3FlowableServer.java | 2 + .../jaxrs/reactive/RxJava3FlowableService.java | 75 +++++++++ .../jaxrs/reactive/RxJava3ObservableServer.java | 5 +- .../jaxrs/reactive/RxJava3ObservableService.java | 25 +++ ...ervableServer.java => RxJava3SingleServer.java} | 18 +- ...vableService.java => RxJava3SingleService.java} | 60 ++++--- 13 files changed, 555 insertions(+), 62 deletions(-) diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java index bf07eec..592b9e6 100644 --- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java +++ b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java @@ -20,6 +20,7 @@ package org.apache.cxf.jaxrs.rx3.server; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension; +import org.apache.cxf.jaxrs.reactivestreams.server.ResponseStatusOnlyExceptionMapper; import org.apache.cxf.service.invoker.Invoker; public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension { @@ -31,6 +32,7 @@ public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension { if (useStreamingSubscriber != null) { invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber); } + bean.setProvider(new ResponseStatusOnlyExceptionMapper()); return invoker; } } diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java index 6739092..4d6be4f 100644 --- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java +++ b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java @@ -18,6 +18,8 @@ */ package org.apache.cxf.jaxrs.rx3.server; +import java.util.Collections; + import org.apache.cxf.jaxrs.impl.AsyncResponseImpl; import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker; import org.apache.cxf.message.Message; @@ -51,7 +53,7 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker { protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) { final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) { - Disposable d = f.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t)); + Disposable d = subscribe(f, asyncResponse); if (d == null) { throw new IllegalStateException("Subscribe did not return a Disposable"); } @@ -61,11 +63,22 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker { protected AsyncResponseImpl handleObservable(Message inMessage, Observable<?> obs) { final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); - Disposable d = obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t)); + Disposable d = subscribe(obs, asyncResponse); if (d == null) { throw new IllegalStateException("Subscribe did not return a Disposable"); } return asyncResponse; } + private <T> Disposable subscribe(final Flowable<T> f, final AsyncResponseImpl asyncResponse) { + return f + .switchIfEmpty(Flowable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList()))) + .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t)); + } + + private <T> Disposable subscribe(final Observable<T> obs, final AsyncResponseImpl asyncResponse) { + return obs + .switchIfEmpty(Observable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList()))) + .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t)); + } } diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalArgumentExceptionMapper.java similarity index 53% copy from rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalArgumentExceptionMapper.java index bf07eec..a1dc412 100644 --- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalArgumentExceptionMapper.java @@ -16,21 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.cxf.jaxrs.rx3.server; -import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; -import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension; -import org.apache.cxf.service.invoker.Invoker; +package org.apache.cxf.systest.jaxrs.reactive; -public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension { +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ExceptionMapper; +import javax.ws.rs.ext.Provider; + +@Provider +public class IllegalArgumentExceptionMapper implements ExceptionMapper<IllegalArgumentException> { @Override - protected Invoker createInvoker(JAXRSServerFactoryBean bean) { - Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true) - .getOrDefault("useStreamingSubscriber", null); - ReactiveIOInvoker invoker = new ReactiveIOInvoker(); - if (useStreamingSubscriber != null) { - invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber); - } - return invoker; + public Response toResponse(IllegalArgumentException exception) { + return Response.status(400).build(); } } diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalStateExceptionMapper.java similarity index 53% copy from rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalStateExceptionMapper.java index bf07eec..95ae19b 100644 --- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/IllegalStateExceptionMapper.java @@ -16,21 +16,20 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.cxf.jaxrs.rx3.server; -import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; -import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension; -import org.apache.cxf.service.invoker.Invoker; +package org.apache.cxf.systest.jaxrs.reactive; -public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension { +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ExceptionMapper; +import javax.ws.rs.ext.Provider; + +@Provider +public class IllegalStateExceptionMapper implements ExceptionMapper<IllegalStateException> { @Override - protected Invoker createInvoker(JAXRSServerFactoryBean bean) { - Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true) - .getOrDefault("useStreamingSubscriber", null); - ReactiveIOInvoker invoker = new ReactiveIOInvoker(); - if (useStreamingSubscriber != null) { - invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber); - } - return invoker; + public Response toResponse(IllegalStateException exception) { + return Response + .status(409) + .entity(exception) + .build(); } } diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java index 9df86bc..980129e 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java @@ -24,10 +24,13 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; +import javax.ws.rs.InternalServerErrorException; import javax.ws.rs.NotFoundException; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Invocation; import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; @@ -64,6 +67,186 @@ public class JAXRSRxJava3FlowableTest extends AbstractBusClientServerTestBase { } @Test + public void testGetHelloWorldEmpty() throws Exception { + String address = "http://localhost:" + PORT + "/rx3/flowable/empty"; + + final Flowable<Response> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(); + + final TestSubscriber<Response> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> "[]".equals(r.readEntity(String.class))) + .assertComplete(); + } + + @Test + public void testGetHelloWorldEmpty2() throws Exception { + String address = "http://localhost:" + PORT + "/rx33/flowable/empty"; + + final Flowable<Response> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(); + + final TestSubscriber<Response> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> "[]".equals(r.readEntity(String.class))) + .assertComplete(); + } + + @Test + public void testFlowableImmediateErrors() throws Exception { + String address = "http://localhost:" + PORT + "/rx33/flowable/immediate/errors"; + + final Flowable<HelloWorldBean> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(HelloWorldBean.class); + + final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber.assertError(InternalServerErrorException.class); + } + + @Test + public void testFlowableErrorWithExceptionMapperReturnsNoExceptionPayload() throws Exception { + String address = "http://localhost:" + PORT + "/rx33/flowable/mixed/error"; + + final Flowable<Response> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(); + + final TestSubscriber<Response> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + // The response should not include the exception payload (injected by exception mapper) + // if some elements have been emitted before + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> r.getStatus() == 409 && !r.readEntity(String.class).contains("stackTrace")) + .assertComplete(); + } + + @Test + public void testFlowableErrorWithExceptionMapperReturnsContentPayload() throws Exception { + GenericType<List<HelloWorldBean>> helloWorldBeanListType = new GenericType<List<HelloWorldBean>>() { }; + String address = "http://localhost:" + PORT + "/rx33/flowable/mixed/error"; + + final Flowable<Response> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(); + + final TestSubscriber<Response> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + // The response should include the emitted elements prior the error + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> r.getStatus() == 409 && r.readEntity(helloWorldBeanListType).size() == 4) + .assertComplete(); + } + + @Test + public void testFlowableErrorsResponseWithMapper() throws Exception { + String address = "http://localhost:" + PORT + "/rx33/flowable/mapper/errors"; + + final Flowable<Response> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(); + + final TestSubscriber<Response> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> r.getStatus() == 400) + .assertComplete(); + } + + @Test + public void testFlowableErrorWithWebException() throws Exception { + String address = "http://localhost:" + PORT + "/rx33/flowable/web/errors"; + + final Flowable<Response> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(); + + final TestSubscriber<Response> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + // The response should not include the exception payload (injected by exception mapper) + // if some elements have been emitted before + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> r.getStatus() == 403 && !r.readEntity(String.class).contains("stackTrace")) + .assertComplete(); + } + + @Test + public void testFlowableImmediateErrorsWithExceptionMapper() throws Exception { + String address = "http://localhost:" + PORT + "/rx33/flowable/immediate/mapper/errors"; + + final Flowable<Response> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(); + + final TestSubscriber<Response> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> r.getStatus() == 409 && r.readEntity(String.class).contains("stackTrace")) + .assertComplete(); + } + + @Test public void testGetHelloWorldJson() throws Exception { String address = "http://localhost:" + PORT + "/rx3/flowable/textJson"; List<Object> providers = new LinkedList<>(); diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java index 6b66577..f1eb9f0 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java @@ -22,8 +22,13 @@ package org.apache.cxf.systest.jaxrs.reactive; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.ws.rs.InternalServerErrorException; +import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import javax.xml.ws.Holder; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; @@ -36,6 +41,7 @@ import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.observers.TestObserver; import org.junit.BeforeClass; import org.junit.Test; @@ -88,7 +94,71 @@ public class JAXRSRxJava3ObservableTest extends AbstractBusClientServerTestBase String address = "http://localhost:" + PORT + "/rx3/observable/textJsonList"; doTestGetHelloWorldJsonList(address); } + + @Test + public void testGetHelloWorldEmpty() throws Exception { + String address = "http://localhost:" + PORT + "/rx3/observable/empty"; + + final Observable<Response> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new ObservableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(ObservableRxInvoker.class) + .get(); + + final TestObserver<Response> subscriber = new TestObserver<>(); + obs.subscribe(subscriber); + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> "[]".equals(r.readEntity(String.class))) + .assertComplete(); + } + + @Test + public void testObservableImmediateErrors() throws Exception { + String address = "http://localhost:" + PORT + "/rx3/observable/immediate/errors"; + + final Observable<HelloWorldBean> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new ObservableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(ObservableRxInvoker.class) + .get(HelloWorldBean.class); + + final TestObserver<HelloWorldBean> subscriber = new TestObserver<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber.assertError(InternalServerErrorException.class); + } + + @Test + public void testObservableImmediateErrorsWithExceptionMapper() throws Exception { + String address = "http://localhost:" + PORT + "/rx3/observable/immediate/mapper/errors"; + + final Observable<Response> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new ObservableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(ObservableRxInvoker.class) + .get(); + + final TestObserver<Response> subscriber = new TestObserver<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> r.getStatus() == 409 && r.readEntity(String.class).contains("stackTrace")) + .assertComplete(); + } + private void doTestGetHelloWorldJsonList(String address) throws Exception { WebClient wc = WebClient.create(address, Collections.singletonList(new JacksonJsonProvider())); diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3SingleTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3SingleTest.java new file mode 100644 index 0000000..563cfcd --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3SingleTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.reactive; + +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.InternalServerErrorException; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.MediaType; + +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + +import org.apache.cxf.jaxrs.model.AbstractResourceInfo; +import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvoker; +import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvokerProvider; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.subscribers.TestSubscriber; + +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + + +public class JAXRSRxJava3SingleTest extends AbstractBusClientServerTestBase { + public static final String PORT = RxJava3SingleServer.PORT; + @BeforeClass + public static void startServers() throws Exception { + AbstractResourceInfo.clearAllMaps(); + assertTrue("server did not launch correctly", launchServer(RxJava3SingleServer.class, true)); + createStaticBus(); + } + @Test + public void testGetHelloWorldJson() throws Exception { + String address = "http://localhost:" + PORT + "/rx3/single/textJson"; + + final Flowable<HelloWorldBean> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(HelloWorldBean.class); + + final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World")) + .assertComplete(); + } + + @Test + public void testGetString() throws Exception { + String address = "http://localhost:" + PORT + "/rx3/single/textAsync"; + + final Flowable<String> obs = ClientBuilder + .newClient() + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.TEXT_PLAIN) + .rx(FlowableRxInvoker.class) + .get(String.class); + + final TestSubscriber<String> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> "Hello, world!".equals(r)) + .assertComplete(); + } + + @Test + public void testGetError() throws Exception { + String address = "http://localhost:" + PORT + "/rx3/single/error"; + + final Flowable<String> obs = ClientBuilder + .newClient() + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(String.class); + + final TestSubscriber<String> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber.assertError(InternalServerErrorException.class); + } +} diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java index f78d918..109c2a2 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java @@ -51,6 +51,8 @@ public class RxJava3FlowableServer extends AbstractBusTestServerBase { JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); sf.getProperties(true).put("useStreamingSubscriber", useStreamingSubscriber); sf.setProvider(new JacksonJsonProvider()); + sf.setProvider(new IllegalArgumentExceptionMapper()); + sf.setProvider(new IllegalStateExceptionMapper()); new ReactiveIOCustomizer().customize(sf); sf.getOutInterceptors().add(new LoggingOutInterceptor()); sf.setResourceClasses(RxJava3FlowableService.class); diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java index af67f50..618d3e4 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java @@ -24,14 +24,18 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; +import javax.ws.rs.ForbiddenException; import javax.ws.rs.GET; +import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.MediaType; import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber; import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber; +import org.apache.cxf.systest.jaxrs.reactor.HelloWorldBean; import io.reactivex.rxjava3.core.BackpressureStrategy; import io.reactivex.rxjava3.core.Flowable; @@ -51,6 +55,77 @@ public class RxJava3FlowableService { } @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/empty") + public Flowable<HelloWorldBean> empty() { + return Flowable.empty(); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/mapper/errors") + public Flowable<HelloWorldBean> mapperErrors() { + return Flowable + .range(1, 3) + .flatMap(item -> { + if (item < 3) { + return Flowable.just(new HelloWorldBean("Person " + item)); + } else { + return Flowable.error(new IllegalArgumentException("Oops")); + } + }); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/web/errors") + public Flowable<HelloWorldBean> webErrors() { + return Flowable + .range(1, 3) + .concatMap(item -> { + if (item < 3) { + return Flowable.just(new HelloWorldBean("Person " + item)); + } else { + return Flowable.error(new ForbiddenException("Oops")); + } + }); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/immediate/errors") + public Flowable<HelloWorldBean> immediateErrors() { + return Flowable + .range(1, 2) + .flatMap(item -> Flowable.error(new RuntimeException("Oops"))); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/immediate/mapper/errors") + public Flowable<HelloWorldBean> immediateMapperErrors() { + return Flowable + .range(1, 2) + .flatMap(item -> Flowable.error(new IllegalStateException("Oops"))); + } + + @GET + @Path("/mixed/error") + @Produces(MediaType.APPLICATION_JSON) + public Flowable<HelloWorldBean> errorAndData() { + return Flowable + .range(1, 5) + .flatMap(item -> { + if (item <= 4) { + return Flowable.just(new HelloWorldBean(" of Item: " + item)); + } else { + return Flowable.error(new NotFoundException("Item not found")); + } + }) + .onErrorResumeNext(e -> Flowable.error(new IllegalStateException("Oops", e))); + } + + @GET @Produces("application/json") @Path("textJsonImplicitListAsync") public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) { diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java index 0a8fcaa..6365bd8 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java @@ -26,7 +26,7 @@ import org.apache.cxf.BusFactory; import org.apache.cxf.ext.logging.LoggingOutInterceptor; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; -import org.apache.cxf.jaxrs.rx3.server.ReactiveIOInvoker; +import org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer; import org.apache.cxf.testutil.common.AbstractBusTestServerBase; @@ -42,8 +42,9 @@ public class RxJava3ObservableServer extends AbstractBusTestServerBase { // Make sure default JSONProvider is not loaded bus.setProperty("skip.default.json.provider.registration", true); JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); - sf.setInvoker(new ReactiveIOInvoker()); sf.setProvider(new JacksonJsonProvider()); + sf.setProvider(new IllegalStateExceptionMapper()); + new ReactiveIOCustomizer().customize(sf); sf.getOutInterceptors().add(new LoggingOutInterceptor()); sf.setResourceClasses(RxJava3ObservableService.class); sf.setResourceProvider(RxJava3ObservableService.class, diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java index 51d362c..7393d40 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java @@ -25,6 +25,7 @@ import java.util.List; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; import io.reactivex.rxjava3.core.Observable; @@ -55,4 +56,28 @@ public class RxJava3ObservableService { return Observable.just(Arrays.asList(bean1, bean2)); } + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/empty") + public Observable<HelloWorldBean> empty() { + return Observable.empty(); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/immediate/errors") + public Observable<HelloWorldBean> immediateErrors() { + return Observable + .range(1, 2) + .flatMap(item -> Observable.error(new RuntimeException("Oops"))); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/immediate/mapper/errors") + public Observable<HelloWorldBean> immediateMapperErrors() { + return Observable + .range(1, 2) + .flatMap(item -> Observable.error(new IllegalStateException("Oops"))); + } } diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleServer.java similarity index 80% copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleServer.java index 0a8fcaa..4b91c33b 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleServer.java @@ -26,15 +26,15 @@ import org.apache.cxf.BusFactory; import org.apache.cxf.ext.logging.LoggingOutInterceptor; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; -import org.apache.cxf.jaxrs.rx3.server.ReactiveIOInvoker; +import org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer; import org.apache.cxf.testutil.common.AbstractBusTestServerBase; -public class RxJava3ObservableServer extends AbstractBusTestServerBase { - public static final String PORT = allocatePort(RxJava3ObservableServer.class); +public class RxJava3SingleServer extends AbstractBusTestServerBase { + public static final String PORT = allocatePort(RxJava3SingleServer.class); org.apache.cxf.endpoint.Server server; - public RxJava3ObservableServer() { + public RxJava3SingleServer() { } protected void run() { @@ -42,12 +42,12 @@ public class RxJava3ObservableServer extends AbstractBusTestServerBase { // Make sure default JSONProvider is not loaded bus.setProperty("skip.default.json.provider.registration", true); JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); - sf.setInvoker(new ReactiveIOInvoker()); sf.setProvider(new JacksonJsonProvider()); + new ReactiveIOCustomizer().customize(sf); sf.getOutInterceptors().add(new LoggingOutInterceptor()); - sf.setResourceClasses(RxJava3ObservableService.class); - sf.setResourceProvider(RxJava3ObservableService.class, - new SingletonResourceProvider(new RxJava3ObservableService(), true)); + sf.setResourceClasses(RxJava3SingleService.class); + sf.setResourceProvider(RxJava3SingleService.class, + new SingletonResourceProvider(new RxJava3SingleService(), true)); sf.setAddress("http://localhost:" + PORT + "/"); server = sf.create(); } @@ -60,7 +60,7 @@ public class RxJava3ObservableServer extends AbstractBusTestServerBase { public static void main(String[] args) { try { - RxJava3ObservableServer s = new RxJava3ObservableServer(); + RxJava3SingleServer s = new RxJava3SingleServer(); s.start(); } catch (Exception ex) { ex.printStackTrace(); diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleService.java similarity index 50% copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleService.java index 51d362c..9fc0797 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3SingleService.java @@ -19,40 +19,54 @@ package org.apache.cxf.systest.jaxrs.reactive; -import java.util.Arrays; -import java.util.List; - import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; -import io.reactivex.rxjava3.core.Observable; +import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber; -@Path("/rx3/observable") -public class RxJava3ObservableService { +import io.reactivex.rxjava3.core.Single; + +@Path("/rx3/single") +public class RxJava3SingleService { - @GET - @Produces("text/plain") - @Path("text") - public Observable<String> getText() { - return Observable.just("Hello, world!"); - } - @GET @Produces("application/json") @Path("textJson") - public Observable<HelloWorldBean> getJson() { - return Observable.just(new HelloWorldBean()); + public Single<HelloWorldBean> getJson() { + return Single.just(new HelloWorldBean()); + } + + @GET + @Produces("text/plain") + @Path("textAsync") + public void getTextAsync(@Suspended final AsyncResponse ar) { + final StringAsyncSubscriber subscriber = new StringAsyncSubscriber(ar); + + Single + .just("Hello, ") + .map(s -> s + "world!") + .subscribe( + s -> { + subscriber.onNext(s); + subscriber.onComplete(); + }, + subscriber::onError); + } @GET @Produces("application/json") - @Path("textJsonList") - public Observable<List<HelloWorldBean>> getJsonList() { - HelloWorldBean bean1 = new HelloWorldBean(); - HelloWorldBean bean2 = new HelloWorldBean(); - bean2.setGreeting("Ciao"); - return Observable.just(Arrays.asList(bean1, bean2)); + @Path("error") + public Single<HelloWorldBean> getError() { + return Single.error(new RuntimeException("Oops")); + } + + private static class StringAsyncSubscriber extends AbstractSubscriber<String> { + StringAsyncSubscriber(AsyncResponse ar) { + super(ar); + } } - -} +} \ No newline at end of file
