This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new e38fc4a CXF-8078: Support RxJava 3.0
e38fc4a is described below
commit e38fc4aa14d7741369de5d027b30e52def3a0549
Author: reta <[email protected]>
AuthorDate: Mon Apr 13 22:38:32 2020 -0400
CXF-8078: Support RxJava 3.0
---
parent/pom.xml | 6 +
rt/rs/extensions/rx3/pom.xml | 58 ++++++
.../cxf/jaxrs/rx3/client/FlowableRxInvoker.java | 105 +++++++++++
.../jaxrs/rx3/client/FlowableRxInvokerImpl.java | 199 +++++++++++++++++++++
.../rx3/client/FlowableRxInvokerProvider.java | 40 +++++
.../cxf/jaxrs/rx3/client/ObservableRxInvoker.java | 104 +++++++++++
.../jaxrs/rx3/client/ObservableRxInvokerImpl.java | 198 ++++++++++++++++++++
.../rx3/client/ObservableRxInvokerProvider.java | 38 ++++
.../cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java | 36 ++++
.../cxf/jaxrs/rx3/server/ReactiveIOInvoker.java | 71 ++++++++
...rs.ext.JAXRSServerFactoryCustomizationExtension | 1 +
rt/rs/pom.xml | 1 +
systests/jaxrs/pom.xml | 12 ++
.../jaxrs/reactive/JAXRSRxJava3FlowableTest.java | 155 ++++++++++++++++
.../jaxrs/reactive/JAXRSRxJava3ObservableTest.java | 106 +++++++++++
.../jaxrs/reactive/RxJava3FlowableServer.java | 81 +++++++++
.../jaxrs/reactive/RxJava3FlowableService.java | 165 +++++++++++++++++
.../jaxrs/reactive/RxJava3ObservableServer.java | 73 ++++++++
.../jaxrs/reactive/RxJava3ObservableService.java | 58 ++++++
19 files changed, 1507 insertions(+)
diff --git a/parent/pom.xml b/parent/pom.xml
index 25c1466..5f93219 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -185,6 +185,7 @@
<cxf.rhino.version>1.7R2</cxf.rhino.version>
<cxf.rxjava.version>1.3.8</cxf.rxjava.version>
<cxf.rxjava2.version>2.2.19</cxf.rxjava2.version>
+ <cxf.rxjava3.version>3.0.2</cxf.rxjava3.version>
<cxf.servlet-api.artifact>jakarta.servlet-api</cxf.servlet-api.artifact>
<cxf.servlet-api.group>jakarta.servlet</cxf.servlet-api.group>
<cxf.servlet-api.version>4.0.3</cxf.servlet-api.version>
@@ -1428,6 +1429,11 @@
<version>${cxf.rxjava2.version}</version>
</dependency>
<dependency>
+ <groupId>io.reactivex.rxjava3</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>${cxf.rxjava3.version}</version>
+ </dependency>
+ <dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${cxf.reactor.version}</version>
diff --git a/rt/rs/extensions/rx3/pom.xml b/rt/rs/extensions/rx3/pom.xml
new file mode 100644
index 0000000..93be0ef
--- /dev/null
+++ b/rt/rs/extensions/rx3/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>cxf-rt-rs-extension-rx3</artifactId>
+ <packaging>bundle</packaging>
+ <name>Apache CXF JAX-RS Extensions: RxJava3</name>
+ <description>Apache CXF JAX-RS Extensions: RxJava3</description>
+ <url>https://cxf.apache.org</url>
+ <parent>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-parent</artifactId>
+ <version>3.4.0-SNAPSHOT</version>
+ <relativePath>../../../../parent/pom.xml</relativePath>
+ </parent>
+ <properties>
+ <cxf.module.name>org.apache.cxf.rs.rx3</cxf.module.name>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.reactivex.rxjava3</groupId>
+ <artifactId>rxjava</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-extension-reactivestreams</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvoker.java
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvoker.java
new file mode 100644
index 0000000..4b89736
--- /dev/null
+++
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvoker.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.client;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.RxInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import io.reactivex.rxjava3.core.Flowable;
+
+
+public interface FlowableRxInvoker extends RxInvoker<Flowable<?>> {
+ @Override
+ Flowable<Response> get();
+
+ @Override
+ <T> Flowable<T> get(Class<T> responseType);
+
+ @Override
+ <T> Flowable<T> get(GenericType<T> responseType);
+
+ @Override
+ Flowable<Response> put(Entity<?> entity);
+
+ @Override
+ <T> Flowable<T> put(Entity<?> entity, Class<T> clazz);
+
+ @Override
+ <T> Flowable<T> put(Entity<?> entity, GenericType<T> type);
+
+ @Override
+ Flowable<Response> post(Entity<?> entity);
+
+ @Override
+ <T> Flowable<T> post(Entity<?> entity, Class<T> clazz);
+
+ @Override
+ <T> Flowable<T> post(Entity<?> entity, GenericType<T> type);
+
+ @Override
+ Flowable<Response> delete();
+
+ @Override
+ <T> Flowable<T> delete(Class<T> responseType);
+
+ @Override
+ <T> Flowable<T> delete(GenericType<T> responseType);
+
+ @Override
+ Flowable<Response> head();
+
+ @Override
+ Flowable<Response> options();
+
+ @Override
+ <T> Flowable<T> options(Class<T> responseType);
+
+ @Override
+ <T> Flowable<T> options(GenericType<T> responseType);
+
+ @Override
+ Flowable<Response> trace();
+
+ @Override
+ <T> Flowable<T> trace(Class<T> responseType);
+
+ @Override
+ <T> Flowable<T> trace(GenericType<T> responseType);
+
+ @Override
+ Flowable<Response> method(String name);
+
+ @Override
+ <T> Flowable<T> method(String name, Class<T> responseType);
+
+ @Override
+ <T> Flowable<T> method(String name, GenericType<T> responseType);
+
+ @Override
+ Flowable<Response> method(String name, Entity<?> entity);
+
+ @Override
+ <T> Flowable<T> method(String name, Entity<?> entity, Class<T>
responseType);
+
+ @Override
+ <T> Flowable<T> method(String name, Entity<?> entity, GenericType<T>
responseType);
+}
+
diff --git
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvokerImpl.java
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvokerImpl.java
new file mode 100644
index 0000000..19f5cac
--- /dev/null
+++
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvokerImpl.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.client;
+
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import io.reactivex.rxjava3.core.BackpressureStrategy;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.FlowableEmitter;
+import io.reactivex.rxjava3.core.FlowableOnSubscribe;
+import io.reactivex.rxjava3.core.Scheduler;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+
+public class FlowableRxInvokerImpl implements FlowableRxInvoker {
+ private final Scheduler sc;
+ private final SyncInvoker syncInvoker;
+
+ public FlowableRxInvokerImpl(SyncInvoker syncInvoker, ExecutorService ex) {
+ this.syncInvoker = syncInvoker;
+ this.sc = ex == null ? null : Schedulers.from(ex);
+ }
+
+ @Override
+ public Flowable<Response> get() {
+ return get(Response.class);
+ }
+
+ @Override
+ public <T> Flowable<T> get(Class<T> responseType) {
+ return method(HttpMethod.GET, responseType);
+ }
+
+ @Override
+ public <T> Flowable<T> get(GenericType<T> responseType) {
+ return method(HttpMethod.GET, responseType);
+ }
+
+ @Override
+ public Flowable<Response> put(Entity<?> entity) {
+ return put(entity, Response.class);
+ }
+
+ @Override
+ public <T> Flowable<T> put(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.PUT, entity, responseType);
+ }
+
+ @Override
+ public <T> Flowable<T> put(Entity<?> entity, GenericType<T> responseType) {
+ return method(HttpMethod.PUT, entity, responseType);
+ }
+
+ @Override
+ public Flowable<Response> post(Entity<?> entity) {
+ return post(entity, Response.class);
+ }
+
+ @Override
+ public <T> Flowable<T> post(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.POST, entity, responseType);
+ }
+
+ @Override
+ public <T> Flowable<T> post(Entity<?> entity, GenericType<T> responseType)
{
+ return method(HttpMethod.POST, entity, responseType);
+ }
+
+ @Override
+ public Flowable<Response> delete() {
+ return delete(Response.class);
+ }
+
+ @Override
+ public <T> Flowable<T> delete(Class<T> responseType) {
+ return method(HttpMethod.DELETE, responseType);
+ }
+
+ @Override
+ public <T> Flowable<T> delete(GenericType<T> responseType) {
+ return method(HttpMethod.DELETE, responseType);
+ }
+
+ @Override
+ public Flowable<Response> head() {
+ return method(HttpMethod.HEAD);
+ }
+
+ @Override
+ public Flowable<Response> options() {
+ return options(Response.class);
+ }
+
+ @Override
+ public <T> Flowable<T> options(Class<T> responseType) {
+ return method(HttpMethod.OPTIONS, responseType);
+ }
+
+ @Override
+ public <T> Flowable<T> options(GenericType<T> responseType) {
+ return method(HttpMethod.OPTIONS, responseType);
+ }
+
+ @Override
+ public Flowable<Response> trace() {
+ return trace(Response.class);
+ }
+
+ @Override
+ public <T> Flowable<T> trace(Class<T> responseType) {
+ return method("TRACE", responseType);
+ }
+
+ @Override
+ public <T> Flowable<T> trace(GenericType<T> responseType) {
+ return method("TRACE", responseType);
+ }
+
+ @Override
+ public Flowable<Response> method(String name) {
+ return method(name, Response.class);
+ }
+
+ @Override
+ public Flowable<Response> method(String name, Entity<?> entity) {
+ return method(name, entity, Response.class);
+ }
+
+ @Override
+ public <T> Flowable<T> method(String name, Entity<?> entity, Class<T>
responseType) {
+ return create(() -> syncInvoker.method(name, entity, responseType));
+ }
+
+ @Override
+ public <T> Flowable<T> method(String name, Entity<?> entity,
GenericType<T> responseType) {
+ return create(() -> syncInvoker.method(name, entity, responseType));
+ }
+
+ @Override
+ public <T> Flowable<T> method(String name, Class<T> responseType) {
+ return create(() -> syncInvoker.method(name, responseType));
+ }
+
+ @Override
+ public <T> Flowable<T> method(String name, GenericType<T> responseType) {
+ return create(() -> syncInvoker.method(name, responseType));
+ }
+
+ private <T> Flowable<T> create(Supplier<T> supplier) {
+ Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
+ @Override
+ public void subscribe(FlowableEmitter<T> emitter) throws Exception
{
+ try {
+ T response = supplier.get();
+ if (!emitter.isCancelled()) {
+ emitter.onNext(response);
+ }
+
+ if (!emitter.isCancelled()) {
+ emitter.onComplete();
+ }
+ } catch (Throwable e) {
+ if (!emitter.isCancelled()) {
+ emitter.onError(e);
+ }
+ }
+ }
+ }, BackpressureStrategy.DROP);
+
+ if (sc == null) {
+ return flowable.subscribeOn(Schedulers.io());
+ }
+
+ return flowable.subscribeOn(sc).observeOn(sc);
+ }
+
+}
diff --git
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvokerProvider.java
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvokerProvider.java
new file mode 100644
index 0000000..9933d4f
--- /dev/null
+++
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvokerProvider.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.client;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.client.RxInvokerProvider;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class FlowableRxInvokerProvider implements
RxInvokerProvider<FlowableRxInvoker> {
+
+ @Override
+ public FlowableRxInvoker getRxInvoker(SyncInvoker syncInvoker,
ExecutorService executorService) {
+ return new FlowableRxInvokerImpl(syncInvoker, executorService);
+ }
+
+ @Override
+ public boolean isProviderFor(Class<?> rxCls) {
+ return FlowableRxInvoker.class == rxCls;
+ }
+
+}
diff --git
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvoker.java
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvoker.java
new file mode 100644
index 0000000..ab53a00
--- /dev/null
+++
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvoker.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.client;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.RxInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import io.reactivex.rxjava3.core.Observable;
+
+public interface ObservableRxInvoker extends RxInvoker<Observable<?>> {
+ @Override
+ Observable<Response> get();
+
+ @Override
+ <T> Observable<T> get(Class<T> responseType);
+
+ @Override
+ <T> Observable<T> get(GenericType<T> responseType);
+
+ @Override
+ Observable<Response> put(Entity<?> entity);
+
+ @Override
+ <T> Observable<T> put(Entity<?> entity, Class<T> clazz);
+
+ @Override
+ <T> Observable<T> put(Entity<?> entity, GenericType<T> type);
+
+ @Override
+ Observable<Response> post(Entity<?> entity);
+
+ @Override
+ <T> Observable<T> post(Entity<?> entity, Class<T> clazz);
+
+ @Override
+ <T> Observable<T> post(Entity<?> entity, GenericType<T> type);
+
+ @Override
+ Observable<Response> delete();
+
+ @Override
+ <T> Observable<T> delete(Class<T> responseType);
+
+ @Override
+ <T> Observable<T> delete(GenericType<T> responseType);
+
+ @Override
+ Observable<Response> head();
+
+ @Override
+ Observable<Response> options();
+
+ @Override
+ <T> Observable<T> options(Class<T> responseType);
+
+ @Override
+ <T> Observable<T> options(GenericType<T> responseType);
+
+ @Override
+ Observable<Response> trace();
+
+ @Override
+ <T> Observable<T> trace(Class<T> responseType);
+
+ @Override
+ <T> Observable<T> trace(GenericType<T> responseType);
+
+ @Override
+ Observable<Response> method(String name);
+
+ @Override
+ <T> Observable<T> method(String name, Class<T> responseType);
+
+ @Override
+ <T> Observable<T> method(String name, GenericType<T> responseType);
+
+ @Override
+ Observable<Response> method(String name, Entity<?> entity);
+
+ @Override
+ <T> Observable<T> method(String name, Entity<?> entity, Class<T>
responseType);
+
+ @Override
+ <T> Observable<T> method(String name, Entity<?> entity, GenericType<T>
responseType);
+}
+
diff --git
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvokerImpl.java
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvokerImpl.java
new file mode 100644
index 0000000..c9f544e
--- /dev/null
+++
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvokerImpl.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.client;
+
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.ObservableEmitter;
+import io.reactivex.rxjava3.core.ObservableOnSubscribe;
+import io.reactivex.rxjava3.core.Scheduler;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+
+public class ObservableRxInvokerImpl implements ObservableRxInvoker {
+ private final Scheduler sc;
+ private final SyncInvoker syncInvoker;
+
+ public ObservableRxInvokerImpl(SyncInvoker syncInvoker, ExecutorService
ex) {
+ this.syncInvoker = syncInvoker;
+ this.sc = ex == null ? null : Schedulers.from(ex);
+ }
+
+ @Override
+ public Observable<Response> get() {
+ return get(Response.class);
+ }
+
+ @Override
+ public <T> Observable<T> get(Class<T> responseType) {
+ return method(HttpMethod.GET, responseType);
+ }
+
+ @Override
+ public <T> Observable<T> get(GenericType<T> responseType) {
+ return method(HttpMethod.GET, responseType);
+ }
+
+ @Override
+ public Observable<Response> put(Entity<?> entity) {
+ return put(entity, Response.class);
+ }
+
+ @Override
+ public <T> Observable<T> put(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.PUT, entity, responseType);
+ }
+
+ @Override
+ public <T> Observable<T> put(Entity<?> entity, GenericType<T>
responseType) {
+ return method(HttpMethod.PUT, entity, responseType);
+ }
+
+ @Override
+ public Observable<Response> post(Entity<?> entity) {
+ return post(entity, Response.class);
+ }
+
+ @Override
+ public <T> Observable<T> post(Entity<?> entity, Class<T> responseType) {
+ return method(HttpMethod.POST, entity, responseType);
+ }
+
+ @Override
+ public <T> Observable<T> post(Entity<?> entity, GenericType<T>
responseType) {
+ return method(HttpMethod.POST, entity, responseType);
+ }
+
+ @Override
+ public Observable<Response> delete() {
+ return delete(Response.class);
+ }
+
+ @Override
+ public <T> Observable<T> delete(Class<T> responseType) {
+ return method(HttpMethod.DELETE, responseType);
+ }
+
+ @Override
+ public <T> Observable<T> delete(GenericType<T> responseType) {
+ return method(HttpMethod.DELETE, responseType);
+ }
+
+ @Override
+ public Observable<Response> head() {
+ return method(HttpMethod.HEAD);
+ }
+
+ @Override
+ public Observable<Response> options() {
+ return options(Response.class);
+ }
+
+ @Override
+ public <T> Observable<T> options(Class<T> responseType) {
+ return method(HttpMethod.OPTIONS, responseType);
+ }
+
+ @Override
+ public <T> Observable<T> options(GenericType<T> responseType) {
+ return method(HttpMethod.OPTIONS, responseType);
+ }
+
+ @Override
+ public Observable<Response> trace() {
+ return trace(Response.class);
+ }
+
+ @Override
+ public <T> Observable<T> trace(Class<T> responseType) {
+ return method("TRACE", responseType);
+ }
+
+ @Override
+ public <T> Observable<T> trace(GenericType<T> responseType) {
+ return method("TRACE", responseType);
+ }
+
+ @Override
+ public Observable<Response> method(String name) {
+ return method(name, Response.class);
+ }
+
+ @Override
+ public Observable<Response> method(String name, Entity<?> entity) {
+ return method(name, entity, Response.class);
+ }
+
+ @Override
+ public <T> Observable<T> method(String name, Entity<?> entity, Class<T>
responseType) {
+ return create(() -> syncInvoker.method(name, entity, responseType));
+ }
+
+ @Override
+ public <T> Observable<T> method(String name, Entity<?> entity,
GenericType<T> responseType) {
+ return create(() -> syncInvoker.method(name, entity, responseType));
+ }
+
+ @Override
+ public <T> Observable<T> method(String name, Class<T> responseType) {
+ return create(() -> syncInvoker.method(name, responseType));
+ }
+
+ @Override
+ public <T> Observable<T> method(String name, GenericType<T> responseType) {
+ return create(() -> syncInvoker.method(name, responseType));
+ }
+
+ private <T> Observable<T> create(Supplier<T> supplier) {
+ Observable<T> observable = Observable.create(new
ObservableOnSubscribe<T>() {
+ @Override
+ public void subscribe(ObservableEmitter<T> emitter) throws
Exception {
+ try {
+ T response = supplier.get();
+ if (!emitter.isDisposed()) {
+ emitter.onNext(response);
+ }
+
+ if (!emitter.isDisposed()) {
+ emitter.onComplete();
+ }
+ } catch (Throwable e) {
+ if (!emitter.isDisposed()) {
+ emitter.onError(e);
+ }
+ }
+ }
+ });
+
+ if (sc == null) {
+ return observable.subscribeOn(Schedulers.io());
+ }
+
+ return observable.subscribeOn(sc).observeOn(sc);
+ }
+
+}
diff --git
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvokerProvider.java
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvokerProvider.java
new file mode 100644
index 0000000..649f7c1
--- /dev/null
+++
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvokerProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.client;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.client.RxInvokerProvider;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class ObservableRxInvokerProvider implements
RxInvokerProvider<ObservableRxInvoker> {
+ @Override
+ public ObservableRxInvoker getRxInvoker(SyncInvoker syncInvoker,
ExecutorService executorService) {
+ return new ObservableRxInvokerImpl(syncInvoker, executorService);
+ }
+
+ @Override
+ public boolean isProviderFor(Class<?> rxCls) {
+ return ObservableRxInvoker.class == rxCls;
+ }
+}
diff --git
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
new file mode 100644
index 0000000..bf07eec
--- /dev/null
+++
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.server;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.service.invoker.Invoker;
+
+public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
+ @Override
+ protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
+ Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true)
+ .getOrDefault("useStreamingSubscriber", null);
+ ReactiveIOInvoker invoker = new ReactiveIOInvoker();
+ if (useStreamingSubscriber != null) {
+
invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
+ }
+ return invoker;
+ }
+}
diff --git
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
new file mode 100644
index 0000000..6739092
--- /dev/null
+++
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.server;
+
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
+import org.apache.cxf.message.Message;
+
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.disposables.Disposable;
+
+public class ReactiveIOInvoker extends AbstractReactiveInvoker {
+ protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object
result) {
+ if (result instanceof Flowable) {
+ return handleFlowable(inMessage, (Flowable<?>)result);
+ } else if (result instanceof Single) {
+ return handleSingle(inMessage, (Single<?>)result);
+ } else if (result instanceof Observable) {
+ return handleObservable(inMessage, (Observable<?>)result);
+ }
+ return null;
+ }
+
+ protected AsyncResponseImpl handleSingle(Message inMessage, Single<?>
single) {
+ final AsyncResponseImpl asyncResponse = new
AsyncResponseImpl(inMessage);
+ Disposable d = single.subscribe(asyncResponse::resume, t ->
handleThrowable(asyncResponse, t));
+ if (d == null) {
+ throw new IllegalStateException("Subscribe did not return a
Disposable");
+ }
+ return asyncResponse;
+ }
+
+ protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?>
f) {
+ final AsyncResponseImpl asyncResponse = new
AsyncResponseImpl(inMessage);
+ if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) {
+ Disposable d = f.subscribe(asyncResponse::resume, t ->
handleThrowable(asyncResponse, t));
+ if (d == null) {
+ throw new IllegalStateException("Subscribe did not return a
Disposable");
+ }
+ }
+ return asyncResponse;
+ }
+
+ protected AsyncResponseImpl handleObservable(Message inMessage,
Observable<?> obs) {
+ final AsyncResponseImpl asyncResponse = new
AsyncResponseImpl(inMessage);
+ Disposable d = obs.subscribe(v -> asyncResponse.resume(v), t ->
handleThrowable(asyncResponse, t));
+ if (d == null) {
+ throw new IllegalStateException("Subscribe did not return a
Disposable");
+ }
+ return asyncResponse;
+ }
+
+}
diff --git
a/rt/rs/extensions/rx3/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
b/rt/rs/extensions/rx3/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
new file mode 100644
index 0000000..b6f1f28
--- /dev/null
+++
b/rt/rs/extensions/rx3/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
@@ -0,0 +1 @@
+org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer
\ No newline at end of file
diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml
index 2b8bfd8..9f47638 100644
--- a/rt/rs/pom.xml
+++ b/rt/rs/pom.xml
@@ -39,6 +39,7 @@
<module>extensions/search</module>
<module>extensions/rx</module>
<module>extensions/rx2</module>
+ <module>extensions/rx3</module>
<module>extensions/reactor</module>
<module>extensions/reactivestreams</module>
<module>security</module>
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index c968b06..fd89c66 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -60,10 +60,17 @@
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.reactivex.rxjava3</groupId>
+ <artifactId>rxjava</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
@@ -332,6 +339,11 @@
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-extension-rx3</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-rs-extension-reactor</artifactId>
<version>${project.version}</version>
</dependency>
diff --git
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java
new file mode 100644
index 0000000..9df86bc
--- /dev/null
+++
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.core.GenericType;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.subscribers.TestSubscriber;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class JAXRSRxJava3FlowableTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = RxJava3FlowableServer.PORT;
+ @BeforeClass
+ public static void startServers() throws Exception {
+ AbstractResourceInfo.clearAllMaps();
+ assertTrue("server did not launch correctly",
+ launchServer(RxJava3FlowableServer.class, true));
+ createStaticBus();
+ }
+ @Test
+ public void testGetHelloWorldAsyncText() throws Exception {
+ String address = "http://localhost:" + PORT +
"/rx3/flowable/textAsync";
+ WebClient wc = WebClient.create(address);
+ String text = wc.accept("text/plain").get(String.class);
+ assertEquals("Hello, world!", text);
+ }
+
+ @Test
+ public void testGetHelloWorldJson() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx3/flowable/textJson";
+ List<Object> providers = new LinkedList<>();
+ providers.add(new JacksonJsonProvider());
+ providers.add(new FlowableRxInvokerProvider());
+ WebClient wc = WebClient.create(address, providers);
+ Flowable<HelloWorldBean> obs = wc.accept("application/json")
+ .rx(FlowableRxInvoker.class)
+ .get(HelloWorldBean.class);
+
+ final TestSubscriber<HelloWorldBean> subscriber = new
TestSubscriber<>();
+ obs.subscribe(subscriber);
+
+ subscriber.await(3, TimeUnit.SECONDS);
+ subscriber.assertResult(new HelloWorldBean("Hello", "World"));
+ }
+
+ @Test
+ public void testGetHelloWorldJsonImplicitListAsync() throws Exception {
+ String address = "http://localhost:" + PORT +
"/rx3/flowable/textJsonImplicitListAsync";
+ doTestGetHelloWorldJsonList(address);
+ }
+ @Test
+ public void testGetHelloWorldJsonImplicitListAsyncStream() throws
Exception {
+ String address = "http://localhost:" + PORT +
"/rx3/flowable/textJsonImplicitListAsyncStream";
+ doTestGetHelloWorldJsonList(address);
+ }
+ @Test
+ public void testGetHelloWorldJsonImplicitList() throws Exception {
+ String address = "http://localhost:" + PORT +
"/rx33/flowable/textJsonImplicitList";
+ doTestGetHelloWorldJsonList(address);
+ }
+ private void doTestGetHelloWorldJsonList(String address) throws Exception {
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new
JacksonJsonProvider()));
+
WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+ GenericType<List<HelloWorldBean>> genericResponseType = new
GenericType<List<HelloWorldBean>>() {
+ };
+
+ List<HelloWorldBean> beans =
wc.accept("application/json").get(genericResponseType);
+ assertEquals(2, beans.size());
+ assertEquals("Hello", beans.get(0).getGreeting());
+ assertEquals("World", beans.get(0).getAudience());
+ assertEquals("Ciao", beans.get(1).getGreeting());
+ assertEquals("World", beans.get(1).getAudience());
+ }
+
+ @Test
+ public void testGetHelloWorldJsonSingle() throws Exception {
+ String address = "http://localhost:" + PORT +
"/rx33/flowable/textJsonSingle";
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new
JacksonJsonProvider()));
+
+ HelloWorldBean bean =
wc.accept("application/json").get(HelloWorldBean.class);
+ assertEquals("Hello", bean.getGreeting());
+ assertEquals("World", bean.getAudience());
+ }
+
+ @Test
+ public void testGetHelloWorldAsyncObservable() throws Exception {
+ String address = "http://localhost:" + PORT +
"/rx3/flowable/textAsync";
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new
FlowableRxInvokerProvider()));
+ Flowable<String> obs = wc.accept("text/plain")
+ .rx(FlowableRxInvoker.class)
+ .get(String.class);
+
+ final TestSubscriber<String> subscriber = new TestSubscriber<>();
+ obs.map(s -> s + s).subscribe(subscriber);
+
+ subscriber.await(2, TimeUnit.SECONDS);
+ subscriber.assertResult("Hello, world!Hello, world!");
+ }
+
+ @Test
+ public void testGetHelloWorldAsyncObservable404() throws Exception {
+ String address = "http://localhost:" + PORT +
"/rx3/flowable/textAsync404";
+ Invocation.Builder b = ClientBuilder.newClient().register(new
FlowableRxInvokerProvider())
+ .target(address).request();
+
+ final TestSubscriber<String> subscriber = new TestSubscriber<>();
+ b.rx(FlowableRxInvoker.class)
+ .get(String.class)
+ .subscribe(subscriber);
+
+ subscriber.await(1, TimeUnit.SECONDS);
+ subscriber.assertError(NotFoundException.class);
+ }
+}
diff --git
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java
new file mode 100644
index 0000000..6b66577
--- /dev/null
+++
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.core.GenericType;
+import javax.xml.ws.Holder;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx3.client.ObservableRxInvoker;
+import org.apache.cxf.jaxrs.rx3.client.ObservableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.disposables.Disposable;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class JAXRSRxJava3ObservableTest extends
AbstractBusClientServerTestBase {
+ public static final String PORT = RxJava3ObservableServer.PORT;
+ @BeforeClass
+ public static void startServers() throws Exception {
+ AbstractResourceInfo.clearAllMaps();
+ assertTrue("server did not launch correctly",
+ launchServer(RxJava3ObservableServer.class, true));
+ createStaticBus();
+ }
+ @Test
+ public void testGetHelloWorldText() throws Exception {
+ String address = "http://localhost:" + PORT + "/rx3/observable/text";
+ WebClient wc = WebClient.create(address);
+ String text = wc.accept("text/plain").get(String.class);
+ assertEquals("Hello, world!", text);
+ }
+
+ @Test
+ public void testGetHelloWorldJson() throws Exception {
+ String address = "http://localhost:" + PORT +
"/rx3/observable/textJson";
+ List<Object> providers = new LinkedList<>();
+ providers.add(new JacksonJsonProvider());
+ providers.add(new ObservableRxInvokerProvider());
+ WebClient wc = WebClient.create(address, providers);
+ Observable<HelloWorldBean> obs = wc.accept("application/json")
+ .rx(ObservableRxInvoker.class)
+ .get(HelloWorldBean.class);
+
+ Holder<HelloWorldBean> holder = new Holder<>();
+ Disposable d = obs.subscribe(v -> {
+ holder.value = v;
+ });
+ if (d == null) {
+ throw new IllegalStateException("Subscribe did not return a
Disposable");
+ }
+ Thread.sleep(2000);
+ assertEquals("Hello", holder.value.getGreeting());
+ assertEquals("World", holder.value.getAudience());
+ }
+
+ @Test
+ public void testGetHelloWorldJsonList() throws Exception {
+ String address = "http://localhost:" + PORT +
"/rx3/observable/textJsonList";
+ doTestGetHelloWorldJsonList(address);
+ }
+
+ private void doTestGetHelloWorldJsonList(String address) throws Exception {
+ WebClient wc = WebClient.create(address,
+ Collections.singletonList(new
JacksonJsonProvider()));
+
WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+ GenericType<List<HelloWorldBean>> genericResponseType = new
GenericType<List<HelloWorldBean>>() {
+ };
+
+ List<HelloWorldBean> beans =
wc.accept("application/json").get(genericResponseType);
+ assertEquals(2, beans.size());
+ assertEquals("Hello", beans.get(0).getGreeting());
+ assertEquals("World", beans.get(0).getAudience());
+ assertEquals("Ciao", beans.get(1).getGreeting());
+ assertEquals("World", beans.get(1).getAudience());
+ }
+}
diff --git
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java
new file mode 100644
index 0000000..f78d918
--- /dev/null
+++
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava3FlowableServer extends AbstractBusTestServerBase {
+ public static final String PORT =
allocatePort(RxJava3FlowableServer.class);
+
+ org.apache.cxf.endpoint.Server server;
+ org.apache.cxf.endpoint.Server server2;
+ public RxJava3FlowableServer() {
+ }
+
+ protected void run() {
+ Bus bus = BusFactory.getDefaultBus();
+ // Make sure default JSONProvider is not loaded
+ bus.setProperty("skip.default.json.provider.registration", true);
+ server = createFactoryBean(bus, false, "/rx3").create();
+ server = createFactoryBean(bus, true, "/rx33").create();
+ }
+
+ private JAXRSServerFactoryBean createFactoryBean(Bus bus, boolean
useStreamingSubscriber,
+ String relAddress) {
+ JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+ sf.getProperties(true).put("useStreamingSubscriber",
useStreamingSubscriber);
+ sf.setProvider(new JacksonJsonProvider());
+ new ReactiveIOCustomizer().customize(sf);
+ sf.getOutInterceptors().add(new LoggingOutInterceptor());
+ sf.setResourceClasses(RxJava3FlowableService.class);
+ sf.setResourceProvider(RxJava3FlowableService.class,
+ new SingletonResourceProvider(new
RxJava3FlowableService(), true));
+ sf.setAddress("http://localhost:" + PORT + relAddress);
+ return sf;
+ }
+
+ public void tearDown() throws Exception {
+ server.stop();
+ server.destroy();
+ server = null;
+ }
+
+ public static void main(String[] args) {
+ try {
+ RxJava3FlowableServer s = new RxJava3FlowableServer();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+
+}
diff --git
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java
new file mode 100644
index 0000000..af67f50
--- /dev/null
+++
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+
+import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
+import
org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
+
+import io.reactivex.rxjava3.core.BackpressureStrategy;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+
+
+@Path("/flowable")
+public class RxJava3FlowableService {
+
+
+ @GET
+ @Produces("application/json")
+ @Path("textJson")
+ public Flowable<HelloWorldBean> getJson() {
+ return Flowable.just(new HelloWorldBean());
+ }
+
+ @GET
+ @Produces("application/json")
+ @Path("textJsonImplicitListAsync")
+ public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
+ final HelloWorldBean bean1 = new HelloWorldBean();
+ final HelloWorldBean bean2 = new HelloWorldBean("Ciao");
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ Flowable.just(bean1, bean2).subscribe(new
ListAsyncSubscriber<HelloWorldBean>(ar));
+ }
+ }).start();
+
+ }
+ @GET
+ @Produces("application/json")
+ @Path("textJsonImplicitListAsyncStream")
+ public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar)
{
+ Flowable.just("Hello", "Ciao")
+ .map(HelloWorldBean::new)
+ .subscribeOn(Schedulers.computation())
+ .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar));
+ }
+
+ @GET
+ @Produces("application/json")
+ @Path("textJsonImplicitList")
+ public Flowable<HelloWorldBean> getJsonImplicitList() {
+ return Flowable.create(subscriber -> {
+ Thread t = new Thread(() -> {
+ subscriber.onNext(new HelloWorldBean("Hello"));
+ sleep();
+ subscriber.onNext(new HelloWorldBean("Ciao"));
+ sleep();
+ subscriber.onComplete();
+ });
+ t.start();
+ }, BackpressureStrategy.MISSING);
+ }
+
+ @GET
+ @Produces("application/json")
+ @Path("textJsonSingle")
+ public Single<HelloWorldBean> getJsonSingle() {
+ CompletableFuture<HelloWorldBean> completableFuture = CompletableFuture
+ .supplyAsync(() -> {
+ sleep();
+ return new HelloWorldBean("Hello");
+ });
+ return Single.fromFuture(completableFuture);
+ }
+
+ private static void sleep() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ }
+
+ @GET
+ @Produces("text/plain")
+ @Path("textAsync")
+ public void getTextAsync(@Suspended final AsyncResponse ar) {
+ Flowable.just("Hello, ").map(s -> s + "world!")
+ .subscribe(new StringAsyncSubscriber(ar));
+
+ }
+
+ private static class StringAsyncSubscriber extends
AbstractSubscriber<String> {
+
+ private StringBuilder sb = new StringBuilder();
+ StringAsyncSubscriber(AsyncResponse ar) {
+ super(ar);
+ }
+ @Override
+ public void onComplete() {
+ super.resume(sb.toString());
+ }
+
+ @Override
+ public void onNext(String s) {
+ sb.append(s);
+ super.requestNext();
+ }
+
+ }
+
+ private static class ListAsyncSubscriber<T> extends AbstractSubscriber<T> {
+
+ private List<T> beans = new LinkedList<>();
+ ListAsyncSubscriber(AsyncResponse ar) {
+ super(ar);
+ }
+ @Override
+ public void onComplete() {
+ super.resume(beans);
+ }
+
+ @Override
+ public void onNext(T bean) {
+ beans.add(bean);
+ super.requestNext();
+ }
+
+ }
+}
+
+
diff --git
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
new file mode 100644
index 0000000..0a8fcaa
--- /dev/null
+++
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.rx3.server.ReactiveIOInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava3ObservableServer extends AbstractBusTestServerBase {
+ public static final String PORT =
allocatePort(RxJava3ObservableServer.class);
+
+ org.apache.cxf.endpoint.Server server;
+ public RxJava3ObservableServer() {
+ }
+
+ protected void run() {
+ Bus bus = BusFactory.getDefaultBus();
+ // Make sure default JSONProvider is not loaded
+ bus.setProperty("skip.default.json.provider.registration", true);
+ JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+ sf.setInvoker(new ReactiveIOInvoker());
+ sf.setProvider(new JacksonJsonProvider());
+ sf.getOutInterceptors().add(new LoggingOutInterceptor());
+ sf.setResourceClasses(RxJava3ObservableService.class);
+ sf.setResourceProvider(RxJava3ObservableService.class,
+ new SingletonResourceProvider(new
RxJava3ObservableService(), true));
+ sf.setAddress("http://localhost:" + PORT + "/");
+ server = sf.create();
+ }
+
+ public void tearDown() throws Exception {
+ server.stop();
+ server.destroy();
+ server = null;
+ }
+
+ public static void main(String[] args) {
+ try {
+ RxJava3ObservableServer s = new RxJava3ObservableServer();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+
+}
diff --git
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
new file mode 100644
index 0000000..51d362c
--- /dev/null
+++
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import io.reactivex.rxjava3.core.Observable;
+
+@Path("/rx3/observable")
+public class RxJava3ObservableService {
+
+ @GET
+ @Produces("text/plain")
+ @Path("text")
+ public Observable<String> getText() {
+ return Observable.just("Hello, world!");
+ }
+
+ @GET
+ @Produces("application/json")
+ @Path("textJson")
+ public Observable<HelloWorldBean> getJson() {
+ return Observable.just(new HelloWorldBean());
+ }
+
+ @GET
+ @Produces("application/json")
+ @Path("textJsonList")
+ public Observable<List<HelloWorldBean>> getJsonList() {
+ HelloWorldBean bean1 = new HelloWorldBean();
+ HelloWorldBean bean2 = new HelloWorldBean();
+ bean2.setGreeting("Ciao");
+ return Observable.just(Arrays.asList(bean1, bean2));
+ }
+
+}