This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.2.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 80ae9864fa22a4f07125efdeeb26d03a4de1246c Author: Andriy Redko <[email protected]> AuthorDate: Sat Nov 28 10:10:52 2020 -0500 CXF-8357: RxJava2/RxJava3: Add support of Maybe type (#732) (cherry picked from commit 74ede52ee9594bad61608150d441f1d9a49701aa) (cherry picked from commit 230d5ea2972a87d3e740c4ddc6873debb677fe4b) --- .../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java | 18 +++ .../cxf/jaxrs/rx3/server/ReactiveIOInvoker.java | 18 +++ .../jaxrs/reactive/JAXRSRxJava2MaybeTest.java | 136 +++++++++++++++++++++ .../jaxrs/reactive/JAXRSRxJava3MaybeTest.java | 136 +++++++++++++++++++++ .../systest/jaxrs/reactive/RxJava2MaybeServer.java | 73 +++++++++++ .../RxJava2MaybeService.java} | 53 ++++---- .../systest/jaxrs/reactive/RxJava3MaybeServer.java | 73 +++++++++++ .../RxJava3MaybeService.java} | 53 ++++---- .../cxf/systest/jaxrs/reactor/MonoReactorTest.java | 18 +++ .../cxf/systest/jaxrs/reactor/MonoService.java | 7 +- 10 files changed, 536 insertions(+), 49 deletions(-) diff --git a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java index 4b87432..4bf5a43 100644 --- a/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java +++ b/rt/rs/extensions/rx2/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java @@ -25,6 +25,7 @@ import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker; import org.apache.cxf.message.Message; import io.reactivex.Flowable; +import io.reactivex.Maybe; import io.reactivex.Observable; import io.reactivex.Single; import io.reactivex.disposables.Disposable; @@ -37,10 +38,21 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker { return handleSingle(inMessage, (Single<?>)result); } else if (result instanceof Observable) { return handleObservable(inMessage, (Observable<?>)result); + } else if (result instanceof Maybe) { + return handleMaybe(inMessage, (Maybe<?>)result); } return null; } + protected AsyncResponseImpl handleMaybe(Message inMessage, Maybe<?> maybe) { + final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); + Disposable d = subscribe(maybe, asyncResponse); + if (d == null) { + throw new IllegalStateException("Subscribe did not return a Disposable"); + } + return asyncResponse; + } + protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) { final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); Disposable d = single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t)); @@ -81,4 +93,10 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker { .switchIfEmpty(Observable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList()))) .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t)); } + + private <T> Disposable subscribe(Maybe<T> maybe, final AsyncResponseImpl asyncResponse) { + return maybe + .switchIfEmpty(Maybe.<T>empty().doOnComplete(() -> asyncResponse.resume(null))) + .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t)); + } } diff --git a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java index 4d6be4f..c9162d9 100644 --- a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java +++ b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java @@ -25,6 +25,7 @@ import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker; import org.apache.cxf.message.Message; import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.disposables.Disposable; @@ -37,10 +38,21 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker { return handleSingle(inMessage, (Single<?>)result); } else if (result instanceof Observable) { return handleObservable(inMessage, (Observable<?>)result); + } else if (result instanceof Maybe) { + return handleMaybe(inMessage, (Maybe<?>)result); } return null; } + protected AsyncResponseImpl handleMaybe(Message inMessage, Maybe<?> maybe) { + final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); + Disposable d = subscribe(maybe, asyncResponse); + if (d == null) { + throw new IllegalStateException("Subscribe did not return a Disposable"); + } + return asyncResponse; + } + protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) { final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); Disposable d = single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t)); @@ -81,4 +93,10 @@ public class ReactiveIOInvoker extends AbstractReactiveInvoker { .switchIfEmpty(Observable.<T>empty().doOnComplete(() -> asyncResponse.resume(Collections.emptyList()))) .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t)); } + + private <T> Disposable subscribe(Maybe<T> maybe, final AsyncResponseImpl asyncResponse) { + return maybe + .switchIfEmpty(Maybe.<T>empty().doOnComplete(() -> asyncResponse.resume(null))) + .subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t)); + } } diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2MaybeTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2MaybeTest.java new file mode 100644 index 0000000..f4f351e --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2MaybeTest.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.reactive; + +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.InternalServerErrorException; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + +import org.apache.cxf.jaxrs.model.AbstractResourceInfo; +import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvoker; +import org.apache.cxf.jaxrs.rx2.client.FlowableRxInvokerProvider; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; + +import io.reactivex.Flowable; +import io.reactivex.subscribers.TestSubscriber; + +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + + +public class JAXRSRxJava2MaybeTest extends AbstractBusClientServerTestBase { + public static final String PORT = RxJava2MaybeServer.PORT; + @BeforeClass + public static void startServers() throws Exception { + AbstractResourceInfo.clearAllMaps(); + assertTrue("server did not launch correctly", launchServer(RxJava2MaybeServer.class, true)); + createStaticBus(); + } + @Test + public void testGetHelloWorldJson() throws Exception { + String address = "http://localhost:" + PORT + "/rx2/maybe/textJson"; + + final Flowable<HelloWorldBean> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(HelloWorldBean.class); + + final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World")) + .assertComplete(); + } + + @Test + public void testGetString() throws Exception { + String address = "http://localhost:" + PORT + "/rx2/maybe/textAsync"; + + final Flowable<String> obs = ClientBuilder + .newClient() + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.TEXT_PLAIN) + .rx(FlowableRxInvoker.class) + .get(String.class); + + final TestSubscriber<String> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> "Hello, world!".equals(r)) + .assertComplete(); + } + + @Test + public void testGetError() throws Exception { + String address = "http://localhost:" + PORT + "/rx2/maybe/error"; + + final Flowable<String> obs = ClientBuilder + .newClient() + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(String.class); + + final TestSubscriber<String> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber.assertError(InternalServerErrorException.class); + } + + @Test + public void testGetHelloWorldEmpty() throws Exception { + String address = "http://localhost:" + PORT + "/rx2/maybe/empty"; + + final Flowable<Response> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(); + + final TestSubscriber<Response> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> !r.hasEntity()) + .assertComplete(); + } +} diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3MaybeTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3MaybeTest.java new file mode 100644 index 0000000..1c4c95e --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3MaybeTest.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.reactive; + +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.InternalServerErrorException; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + +import org.apache.cxf.jaxrs.model.AbstractResourceInfo; +import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvoker; +import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvokerProvider; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.subscribers.TestSubscriber; + +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + + +public class JAXRSRxJava3MaybeTest extends AbstractBusClientServerTestBase { + public static final String PORT = RxJava3MaybeServer.PORT; + @BeforeClass + public static void startServers() throws Exception { + AbstractResourceInfo.clearAllMaps(); + assertTrue("server did not launch correctly", launchServer(RxJava3MaybeServer.class, true)); + createStaticBus(); + } + @Test + public void testGetHelloWorldJson() throws Exception { + String address = "http://localhost:" + PORT + "/rx3/maybe/textJson"; + + final Flowable<HelloWorldBean> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(HelloWorldBean.class); + + final TestSubscriber<HelloWorldBean> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> r.getGreeting().equals("Hello") && r.getAudience().equals("World")) + .assertComplete(); + } + + @Test + public void testGetString() throws Exception { + String address = "http://localhost:" + PORT + "/rx3/maybe/textAsync"; + + final Flowable<String> obs = ClientBuilder + .newClient() + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.TEXT_PLAIN) + .rx(FlowableRxInvoker.class) + .get(String.class); + + final TestSubscriber<String> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> "Hello, world!".equals(r)) + .assertComplete(); + } + + @Test + public void testGetError() throws Exception { + String address = "http://localhost:" + PORT + "/rx3/maybe/error"; + + final Flowable<String> obs = ClientBuilder + .newClient() + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(String.class); + + final TestSubscriber<String> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber.assertError(InternalServerErrorException.class); + } + + @Test + public void testGetHelloWorldEmpty() throws Exception { + String address = "http://localhost:" + PORT + "/rx3/maybe/empty"; + + final Flowable<Response> obs = ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new FlowableRxInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(FlowableRxInvoker.class) + .get(); + + final TestSubscriber<Response> subscriber = new TestSubscriber<>(); + obs.subscribe(subscriber); + + subscriber.await(3, TimeUnit.SECONDS); + subscriber + .assertValue(r -> !r.hasEntity()) + .assertComplete(); + } +} diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeServer.java new file mode 100644 index 0000000..58a79a2 --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeServer.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.reactive; + +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + +import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; +import org.apache.cxf.ext.logging.LoggingOutInterceptor; +import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; +import org.apache.cxf.jaxrs.rx2.server.ReactiveIOCustomizer; +import org.apache.cxf.testutil.common.AbstractBusTestServerBase; + + +public class RxJava2MaybeServer extends AbstractBusTestServerBase { + public static final String PORT = allocatePort(RxJava2MaybeServer.class); + + org.apache.cxf.endpoint.Server server; + public RxJava2MaybeServer() { + } + + protected void run() { + Bus bus = BusFactory.getDefaultBus(); + // Make sure default JSONProvider is not loaded + bus.setProperty("skip.default.json.provider.registration", true); + JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); + sf.setProvider(new JacksonJsonProvider()); + new ReactiveIOCustomizer().customize(sf); + sf.getOutInterceptors().add(new LoggingOutInterceptor()); + sf.setResourceClasses(RxJava2MaybeService.class); + sf.setResourceProvider(RxJava2MaybeService.class, + new SingletonResourceProvider(new RxJava2MaybeService(), true)); + sf.setAddress("http://localhost:" + PORT + "/"); + server = sf.create(); + } + + public void tearDown() throws Exception { + server.stop(); + server.destroy(); + server = null; + } + + public static void main(String[] args) { + try { + RxJava2MaybeServer s = new RxJava2MaybeServer(); + s.start(); + } catch (Exception ex) { + ex.printStackTrace(); + System.exit(-1); + } finally { + System.out.println("done!"); + } + } + +} diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeService.java similarity index 63% copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeService.java index 1848266..71e37b8 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2MaybeService.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.cxf.systest.jaxrs.reactor; +package org.apache.cxf.systest.jaxrs.reactive; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -27,47 +27,52 @@ import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber; -import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; +import org.apache.cxf.systest.jaxrs.reactor.HelloWorldBean; -@Path("/mono") -public class MonoService { +import io.reactivex.Maybe; - @GET - @Produces("application/json") - @Path("textJson") - public Mono<HelloWorldBean> getJson() { - return Mono.just(new HelloWorldBean()); - } +@Path("/rx2/maybe") +public class RxJava2MaybeService { @GET @Produces("application/json") - @Path("textJsonImplicitListAsyncStream") - public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) { - Mono.just("Hello") - .map(HelloWorldBean::new) - .subscribeOn(Schedulers.elastic()) - .subscribe(new JsonStreamingAsyncSubscriber<>(ar, null, null, null, 1000, 0)); + @Path("textJson") + public Maybe<HelloWorldBean> getJson() { + return Maybe.just(new HelloWorldBean()); } @GET @Produces("text/plain") @Path("textAsync") public void getTextAsync(@Suspended final AsyncResponse ar) { - Mono.just("Hello, ").map(s -> s + "world!") - .subscribe(new StringAsyncSubscriber(ar)); + final StringAsyncSubscriber subscriber = new StringAsyncSubscriber(ar); + + Maybe + .just("Hello, ") + .map(s -> s + "world!") + .subscribe( + s -> { + subscriber.onNext(s); + subscriber.onComplete(); + }, + subscriber::onError); } @GET + @Produces("application/json") + @Path("error") + public Maybe<HelloWorldBean> getError() { + return Maybe.error(new RuntimeException("Oops")); + } + + @GET @Produces(MediaType.APPLICATION_JSON) - @Path("/empty") - public Mono<HelloWorldBean> empty() { - return Mono.empty(); + @Path("empty") + public Maybe<HelloWorldBean> empty() { + return Maybe.empty(); } - private static class StringAsyncSubscriber extends AbstractSubscriber<String> { StringAsyncSubscriber(AsyncResponse ar) { super(ar); diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeServer.java new file mode 100644 index 0000000..550f989 --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeServer.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.reactive; + +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + +import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; +import org.apache.cxf.ext.logging.LoggingOutInterceptor; +import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; +import org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer; +import org.apache.cxf.testutil.common.AbstractBusTestServerBase; + + +public class RxJava3MaybeServer extends AbstractBusTestServerBase { + public static final String PORT = allocatePort(RxJava3MaybeServer.class); + + org.apache.cxf.endpoint.Server server; + public RxJava3MaybeServer() { + } + + protected void run() { + Bus bus = BusFactory.getDefaultBus(); + // Make sure default JSONProvider is not loaded + bus.setProperty("skip.default.json.provider.registration", true); + JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); + sf.setProvider(new JacksonJsonProvider()); + new ReactiveIOCustomizer().customize(sf); + sf.getOutInterceptors().add(new LoggingOutInterceptor()); + sf.setResourceClasses(RxJava3MaybeService.class); + sf.setResourceProvider(RxJava3MaybeService.class, + new SingletonResourceProvider(new RxJava3MaybeService(), true)); + sf.setAddress("http://localhost:" + PORT + "/"); + server = sf.create(); + } + + public void tearDown() throws Exception { + server.stop(); + server.destroy(); + server = null; + } + + public static void main(String[] args) { + try { + RxJava3MaybeServer s = new RxJava3MaybeServer(); + s.start(); + } catch (Exception ex) { + ex.printStackTrace(); + System.exit(-1); + } finally { + System.out.println("done!"); + } + } + +} diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeService.java similarity index 63% copy from systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java copy to systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeService.java index 1848266..4673034 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3MaybeService.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.cxf.systest.jaxrs.reactor; +package org.apache.cxf.systest.jaxrs.reactive; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -27,47 +27,52 @@ import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber; -import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; +import org.apache.cxf.systest.jaxrs.reactor.HelloWorldBean; -@Path("/mono") -public class MonoService { +import io.reactivex.rxjava3.core.Maybe; - @GET - @Produces("application/json") - @Path("textJson") - public Mono<HelloWorldBean> getJson() { - return Mono.just(new HelloWorldBean()); - } +@Path("/rx3/maybe") +public class RxJava3MaybeService { @GET @Produces("application/json") - @Path("textJsonImplicitListAsyncStream") - public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) { - Mono.just("Hello") - .map(HelloWorldBean::new) - .subscribeOn(Schedulers.elastic()) - .subscribe(new JsonStreamingAsyncSubscriber<>(ar, null, null, null, 1000, 0)); + @Path("textJson") + public Maybe<HelloWorldBean> getJson() { + return Maybe.just(new HelloWorldBean()); } @GET @Produces("text/plain") @Path("textAsync") public void getTextAsync(@Suspended final AsyncResponse ar) { - Mono.just("Hello, ").map(s -> s + "world!") - .subscribe(new StringAsyncSubscriber(ar)); + final StringAsyncSubscriber subscriber = new StringAsyncSubscriber(ar); + + Maybe + .just("Hello, ") + .map(s -> s + "world!") + .subscribe( + s -> { + subscriber.onNext(s); + subscriber.onComplete(); + }, + subscriber::onError); } @GET + @Produces("application/json") + @Path("error") + public Maybe<HelloWorldBean> getError() { + return Maybe.error(new RuntimeException("Oops")); + } + + @GET @Produces(MediaType.APPLICATION_JSON) - @Path("/empty") - public Mono<HelloWorldBean> empty() { - return Mono.empty(); + @Path("empty") + public Maybe<HelloWorldBean> empty() { + return Maybe.empty(); } - private static class StringAsyncSubscriber extends AbstractSubscriber<String> { StringAsyncSubscriber(AsyncResponse ar) { super(ar); diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java index ab62d80..2312f23 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoReactorTest.java @@ -19,6 +19,7 @@ package org.apache.cxf.systest.jaxrs.reactor; +import javax.ws.rs.InternalServerErrorException; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.core.MediaType; @@ -111,4 +112,21 @@ public class MonoReactorTest extends AbstractBusClientServerTestBase { .expectComplete() .verify(); } + + @Test + public void testGetError() throws Exception { + String address = "http://localhost:" + PORT + "/reactor/mono/error"; + + StepVerifier + .create(ClientBuilder + .newClient() + .register(new JacksonJsonProvider()) + .register(new ReactorInvokerProvider()) + .target(address) + .request(MediaType.APPLICATION_JSON) + .rx(ReactorInvoker.class) + .get(HelloWorldBean.class)) + .expectErrorMatches(ex -> ex.getCause() instanceof InternalServerErrorException) + .verify(); + } } diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java index 1848266..7a68dba 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java @@ -57,7 +57,6 @@ public class MonoService { public void getTextAsync(@Suspended final AsyncResponse ar) { Mono.just("Hello, ").map(s -> s + "world!") .subscribe(new StringAsyncSubscriber(ar)); - } @GET @@ -67,6 +66,12 @@ public class MonoService { return Mono.empty(); } + @GET + @Produces("application/json") + @Path("error") + public Mono<HelloWorldBean> getError() { + return Mono.error(new RuntimeException("Oops")); + } private static class StringAsyncSubscriber extends AbstractSubscriber<String> { StringAsyncSubscriber(AsyncResponse ar) {
