This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new e38fc4a  CXF-8078: Support RxJava 3.0
e38fc4a is described below

commit e38fc4aa14d7741369de5d027b30e52def3a0549
Author: reta <[email protected]>
AuthorDate: Mon Apr 13 22:38:32 2020 -0400

    CXF-8078: Support RxJava 3.0
---
 parent/pom.xml                                     |   6 +
 rt/rs/extensions/rx3/pom.xml                       |  58 ++++++
 .../cxf/jaxrs/rx3/client/FlowableRxInvoker.java    | 105 +++++++++++
 .../jaxrs/rx3/client/FlowableRxInvokerImpl.java    | 199 +++++++++++++++++++++
 .../rx3/client/FlowableRxInvokerProvider.java      |  40 +++++
 .../cxf/jaxrs/rx3/client/ObservableRxInvoker.java  | 104 +++++++++++
 .../jaxrs/rx3/client/ObservableRxInvokerImpl.java  | 198 ++++++++++++++++++++
 .../rx3/client/ObservableRxInvokerProvider.java    |  38 ++++
 .../cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java |  36 ++++
 .../cxf/jaxrs/rx3/server/ReactiveIOInvoker.java    |  71 ++++++++
 ...rs.ext.JAXRSServerFactoryCustomizationExtension |   1 +
 rt/rs/pom.xml                                      |   1 +
 systests/jaxrs/pom.xml                             |  12 ++
 .../jaxrs/reactive/JAXRSRxJava3FlowableTest.java   | 155 ++++++++++++++++
 .../jaxrs/reactive/JAXRSRxJava3ObservableTest.java | 106 +++++++++++
 .../jaxrs/reactive/RxJava3FlowableServer.java      |  81 +++++++++
 .../jaxrs/reactive/RxJava3FlowableService.java     | 165 +++++++++++++++++
 .../jaxrs/reactive/RxJava3ObservableServer.java    |  73 ++++++++
 .../jaxrs/reactive/RxJava3ObservableService.java   |  58 ++++++
 19 files changed, 1507 insertions(+)

diff --git a/parent/pom.xml b/parent/pom.xml
index 25c1466..5f93219 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -185,6 +185,7 @@
         <cxf.rhino.version>1.7R2</cxf.rhino.version>
         <cxf.rxjava.version>1.3.8</cxf.rxjava.version>
         <cxf.rxjava2.version>2.2.19</cxf.rxjava2.version>
+        <cxf.rxjava3.version>3.0.2</cxf.rxjava3.version>
         
<cxf.servlet-api.artifact>jakarta.servlet-api</cxf.servlet-api.artifact>
         <cxf.servlet-api.group>jakarta.servlet</cxf.servlet-api.group>
         <cxf.servlet-api.version>4.0.3</cxf.servlet-api.version>
@@ -1428,6 +1429,11 @@
                 <version>${cxf.rxjava2.version}</version>
             </dependency>
             <dependency>
+                <groupId>io.reactivex.rxjava3</groupId>
+                <artifactId>rxjava</artifactId>
+                <version>${cxf.rxjava3.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>io.projectreactor</groupId>
                 <artifactId>reactor-core</artifactId>
                 <version>${cxf.reactor.version}</version>
diff --git a/rt/rs/extensions/rx3/pom.xml b/rt/rs/extensions/rx3/pom.xml
new file mode 100644
index 0000000..93be0ef
--- /dev/null
+++ b/rt/rs/extensions/rx3/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>cxf-rt-rs-extension-rx3</artifactId>
+    <packaging>bundle</packaging>
+    <name>Apache CXF JAX-RS Extensions: RxJava3</name>
+    <description>Apache CXF JAX-RS Extensions: RxJava3</description>
+    <url>https://cxf.apache.org</url>
+    <parent>
+        <groupId>org.apache.cxf</groupId>
+        <artifactId>cxf-parent</artifactId>
+        <version>3.4.0-SNAPSHOT</version>
+        <relativePath>../../../../parent/pom.xml</relativePath>
+    </parent>
+    <properties>
+        <cxf.module.name>org.apache.cxf.rs.rx3</cxf.module.name>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-client</artifactId>
+            <version>${project.version}</version>
+        </dependency> 
+        <dependency>
+          <groupId>io.reactivex.rxjava3</groupId>
+          <artifactId>rxjava</artifactId>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-extension-reactivestreams</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvoker.java
 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvoker.java
new file mode 100644
index 0000000..4b89736
--- /dev/null
+++ 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvoker.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.client;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.RxInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import io.reactivex.rxjava3.core.Flowable;
+
+
+public interface FlowableRxInvoker extends RxInvoker<Flowable<?>> {
+    @Override
+    Flowable<Response> get();
+
+    @Override
+    <T> Flowable<T> get(Class<T> responseType);
+
+    @Override
+    <T> Flowable<T> get(GenericType<T> responseType);
+
+    @Override
+    Flowable<Response> put(Entity<?> entity);
+
+    @Override
+    <T> Flowable<T> put(Entity<?> entity, Class<T> clazz);
+
+    @Override
+    <T> Flowable<T> put(Entity<?> entity, GenericType<T> type);
+
+    @Override
+    Flowable<Response> post(Entity<?> entity);
+
+    @Override
+    <T> Flowable<T> post(Entity<?> entity, Class<T> clazz);
+
+    @Override
+    <T> Flowable<T> post(Entity<?> entity, GenericType<T> type);
+
+    @Override
+    Flowable<Response> delete();
+
+    @Override
+    <T> Flowable<T> delete(Class<T> responseType);
+
+    @Override
+    <T> Flowable<T> delete(GenericType<T> responseType);
+
+    @Override
+    Flowable<Response> head();
+
+    @Override
+    Flowable<Response> options();
+
+    @Override
+    <T> Flowable<T> options(Class<T> responseType);
+
+    @Override
+    <T> Flowable<T> options(GenericType<T> responseType);
+
+    @Override
+    Flowable<Response> trace();
+
+    @Override
+    <T> Flowable<T> trace(Class<T> responseType);
+
+    @Override
+    <T> Flowable<T> trace(GenericType<T> responseType);
+
+    @Override
+    Flowable<Response> method(String name);
+
+    @Override
+    <T> Flowable<T> method(String name, Class<T> responseType);
+
+    @Override
+    <T> Flowable<T> method(String name, GenericType<T> responseType);
+
+    @Override
+    Flowable<Response> method(String name, Entity<?> entity);
+
+    @Override
+    <T> Flowable<T> method(String name, Entity<?> entity, Class<T> 
responseType);
+
+    @Override
+    <T> Flowable<T> method(String name, Entity<?> entity, GenericType<T> 
responseType);
+}
+
diff --git 
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvokerImpl.java
 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvokerImpl.java
new file mode 100644
index 0000000..19f5cac
--- /dev/null
+++ 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvokerImpl.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.client;
+
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import io.reactivex.rxjava3.core.BackpressureStrategy;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.FlowableEmitter;
+import io.reactivex.rxjava3.core.FlowableOnSubscribe;
+import io.reactivex.rxjava3.core.Scheduler;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+
+public class FlowableRxInvokerImpl implements FlowableRxInvoker {
+    private final Scheduler sc;
+    private final SyncInvoker syncInvoker;
+    
+    public FlowableRxInvokerImpl(SyncInvoker syncInvoker, ExecutorService ex) {
+        this.syncInvoker = syncInvoker;
+        this.sc = ex == null ? null : Schedulers.from(ex);
+    }
+
+    @Override
+    public Flowable<Response> get() {
+        return get(Response.class);
+    }
+
+    @Override
+    public <T> Flowable<T> get(Class<T> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public <T> Flowable<T> get(GenericType<T> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public Flowable<Response> put(Entity<?> entity) {
+        return put(entity, Response.class);
+    }
+
+    @Override
+    public <T> Flowable<T> put(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public <T> Flowable<T> put(Entity<?> entity, GenericType<T> responseType) {
+        return method(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public Flowable<Response> post(Entity<?> entity) {
+        return post(entity, Response.class);
+    }
+
+    @Override
+    public <T> Flowable<T> post(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public <T> Flowable<T> post(Entity<?> entity, GenericType<T> responseType) 
{
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public Flowable<Response> delete() {
+        return delete(Response.class);
+    }
+
+    @Override
+    public <T> Flowable<T> delete(Class<T> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public <T> Flowable<T> delete(GenericType<T> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public Flowable<Response> head() {
+        return method(HttpMethod.HEAD);
+    }
+
+    @Override
+    public Flowable<Response> options() {
+        return options(Response.class);
+    }
+
+    @Override
+    public <T> Flowable<T> options(Class<T> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public <T> Flowable<T> options(GenericType<T> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public Flowable<Response> trace() {
+        return trace(Response.class);
+    }
+
+    @Override
+    public <T> Flowable<T> trace(Class<T> responseType) {
+        return method("TRACE", responseType);
+    }
+
+    @Override
+    public <T> Flowable<T> trace(GenericType<T> responseType) {
+        return method("TRACE", responseType);
+    }
+
+    @Override
+    public Flowable<Response> method(String name) {
+        return method(name, Response.class);
+    }
+
+    @Override
+    public Flowable<Response> method(String name, Entity<?> entity) {
+        return method(name, entity, Response.class);
+    }
+
+    @Override
+    public <T> Flowable<T> method(String name, Entity<?> entity, Class<T> 
responseType) {
+        return create(() -> syncInvoker.method(name, entity, responseType));
+    }
+    
+    @Override
+    public <T> Flowable<T> method(String name, Entity<?> entity, 
GenericType<T> responseType) {
+        return create(() -> syncInvoker.method(name, entity, responseType));
+    }
+
+    @Override
+    public <T> Flowable<T> method(String name, Class<T> responseType) {
+        return create(() -> syncInvoker.method(name, responseType));
+    }
+
+    @Override
+    public <T> Flowable<T> method(String name, GenericType<T> responseType) {
+        return create(() -> syncInvoker.method(name, responseType));
+    }
+    
+    private <T> Flowable<T> create(Supplier<T> supplier) {
+        Flowable<T> flowable = Flowable.create(new FlowableOnSubscribe<T>() {
+            @Override
+            public void subscribe(FlowableEmitter<T> emitter) throws Exception 
{
+                try {
+                    T response = supplier.get();
+                    if (!emitter.isCancelled()) {
+                        emitter.onNext(response);
+                    }
+                    
+                    if (!emitter.isCancelled()) {
+                        emitter.onComplete();
+                    }
+                } catch (Throwable e) {
+                    if (!emitter.isCancelled()) {
+                        emitter.onError(e);
+                    }
+                }
+            }
+        }, BackpressureStrategy.DROP);
+        
+        if (sc == null) {
+            return flowable.subscribeOn(Schedulers.io());
+        }
+        
+        return flowable.subscribeOn(sc).observeOn(sc);
+    }
+
+}
diff --git 
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvokerProvider.java
 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvokerProvider.java
new file mode 100644
index 0000000..9933d4f
--- /dev/null
+++ 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/FlowableRxInvokerProvider.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.client;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.client.RxInvokerProvider;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class FlowableRxInvokerProvider implements 
RxInvokerProvider<FlowableRxInvoker> {
+
+    @Override
+    public FlowableRxInvoker getRxInvoker(SyncInvoker syncInvoker, 
ExecutorService executorService) {
+        return new FlowableRxInvokerImpl(syncInvoker, executorService);
+    }
+
+    @Override
+    public boolean isProviderFor(Class<?> rxCls) {
+        return FlowableRxInvoker.class == rxCls;
+    }
+
+}
diff --git 
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvoker.java
 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvoker.java
new file mode 100644
index 0000000..ab53a00
--- /dev/null
+++ 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvoker.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.client;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.RxInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import io.reactivex.rxjava3.core.Observable;
+
+public interface ObservableRxInvoker extends RxInvoker<Observable<?>> {
+    @Override
+    Observable<Response> get();
+
+    @Override
+    <T> Observable<T> get(Class<T> responseType);
+
+    @Override
+    <T> Observable<T> get(GenericType<T> responseType);
+
+    @Override
+    Observable<Response> put(Entity<?> entity);
+
+    @Override
+    <T> Observable<T> put(Entity<?> entity, Class<T> clazz);
+
+    @Override
+    <T> Observable<T> put(Entity<?> entity, GenericType<T> type);
+
+    @Override
+    Observable<Response> post(Entity<?> entity);
+
+    @Override
+    <T> Observable<T> post(Entity<?> entity, Class<T> clazz);
+
+    @Override
+    <T> Observable<T> post(Entity<?> entity, GenericType<T> type);
+
+    @Override
+    Observable<Response> delete();
+
+    @Override
+    <T> Observable<T> delete(Class<T> responseType);
+
+    @Override
+    <T> Observable<T> delete(GenericType<T> responseType);
+
+    @Override
+    Observable<Response> head();
+
+    @Override
+    Observable<Response> options();
+
+    @Override
+    <T> Observable<T> options(Class<T> responseType);
+
+    @Override
+    <T> Observable<T> options(GenericType<T> responseType);
+
+    @Override
+    Observable<Response> trace();
+
+    @Override
+    <T> Observable<T> trace(Class<T> responseType);
+
+    @Override
+    <T> Observable<T> trace(GenericType<T> responseType);
+
+    @Override
+    Observable<Response> method(String name);
+
+    @Override
+    <T> Observable<T> method(String name, Class<T> responseType);
+
+    @Override
+    <T> Observable<T> method(String name, GenericType<T> responseType);
+
+    @Override
+    Observable<Response> method(String name, Entity<?> entity);
+
+    @Override
+    <T> Observable<T> method(String name, Entity<?> entity, Class<T> 
responseType);
+
+    @Override
+    <T> Observable<T> method(String name, Entity<?> entity, GenericType<T> 
responseType);
+}
+
diff --git 
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvokerImpl.java
 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvokerImpl.java
new file mode 100644
index 0000000..c9f544e
--- /dev/null
+++ 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvokerImpl.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.client;
+
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.Response;
+
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.ObservableEmitter;
+import io.reactivex.rxjava3.core.ObservableOnSubscribe;
+import io.reactivex.rxjava3.core.Scheduler;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+
+public class ObservableRxInvokerImpl implements ObservableRxInvoker {
+    private final Scheduler sc;
+    private final SyncInvoker syncInvoker;
+    
+    public ObservableRxInvokerImpl(SyncInvoker syncInvoker, ExecutorService 
ex) {
+        this.syncInvoker = syncInvoker;
+        this.sc = ex == null ? null : Schedulers.from(ex);
+    }
+
+    @Override
+    public Observable<Response> get() {
+        return get(Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> get(Class<T> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> get(GenericType<T> responseType) {
+        return method(HttpMethod.GET, responseType);
+    }
+
+    @Override
+    public Observable<Response> put(Entity<?> entity) {
+        return put(entity, Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> put(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> put(Entity<?> entity, GenericType<T> 
responseType) {
+        return method(HttpMethod.PUT, entity, responseType);
+    }
+
+    @Override
+    public Observable<Response> post(Entity<?> entity) {
+        return post(entity, Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> post(Entity<?> entity, Class<T> responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> post(Entity<?> entity, GenericType<T> 
responseType) {
+        return method(HttpMethod.POST, entity, responseType);
+    }
+
+    @Override
+    public Observable<Response> delete() {
+        return delete(Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> delete(Class<T> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> delete(GenericType<T> responseType) {
+        return method(HttpMethod.DELETE, responseType);
+    }
+
+    @Override
+    public Observable<Response> head() {
+        return method(HttpMethod.HEAD);
+    }
+
+    @Override
+    public Observable<Response> options() {
+        return options(Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> options(Class<T> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public <T> Observable<T> options(GenericType<T> responseType) {
+        return method(HttpMethod.OPTIONS, responseType);
+    }
+
+    @Override
+    public Observable<Response> trace() {
+        return trace(Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> trace(Class<T> responseType) {
+        return method("TRACE", responseType);
+    }
+
+    @Override
+    public <T> Observable<T> trace(GenericType<T> responseType) {
+        return method("TRACE", responseType);
+    }
+
+    @Override
+    public Observable<Response> method(String name) {
+        return method(name, Response.class);
+    }
+
+    @Override
+    public Observable<Response> method(String name, Entity<?> entity) {
+        return method(name, entity, Response.class);
+    }
+
+    @Override
+    public <T> Observable<T> method(String name, Entity<?> entity, Class<T> 
responseType) {
+        return create(() -> syncInvoker.method(name, entity, responseType));
+    }
+    
+    @Override
+    public <T> Observable<T> method(String name, Entity<?> entity, 
GenericType<T> responseType) {
+        return create(() -> syncInvoker.method(name, entity, responseType));
+    }
+
+    @Override
+    public <T> Observable<T> method(String name, Class<T> responseType) {
+        return create(() -> syncInvoker.method(name, responseType));
+    }
+
+    @Override
+    public <T> Observable<T> method(String name, GenericType<T> responseType) {
+        return create(() -> syncInvoker.method(name, responseType));
+    }
+    
+    private <T> Observable<T> create(Supplier<T> supplier) {
+        Observable<T> observable = Observable.create(new 
ObservableOnSubscribe<T>() {
+            @Override
+            public void subscribe(ObservableEmitter<T> emitter) throws 
Exception {
+                try {
+                    T response = supplier.get();
+                    if (!emitter.isDisposed()) {
+                        emitter.onNext(response);
+                    }
+                    
+                    if (!emitter.isDisposed()) {
+                        emitter.onComplete();
+                    }
+                } catch (Throwable e) {
+                    if (!emitter.isDisposed()) {
+                        emitter.onError(e);
+                    }
+                }
+            }
+        });
+        
+        if (sc == null) {
+            return observable.subscribeOn(Schedulers.io());
+        }
+        
+        return observable.subscribeOn(sc).observeOn(sc);
+    }
+
+}
diff --git 
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvokerProvider.java
 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvokerProvider.java
new file mode 100644
index 0000000..649f7c1
--- /dev/null
+++ 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/client/ObservableRxInvokerProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.client;
+
+import java.util.concurrent.ExecutorService;
+
+import javax.ws.rs.client.RxInvokerProvider;
+import javax.ws.rs.client.SyncInvoker;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class ObservableRxInvokerProvider implements 
RxInvokerProvider<ObservableRxInvoker> {
+    @Override
+    public ObservableRxInvoker getRxInvoker(SyncInvoker syncInvoker, 
ExecutorService executorService) {
+        return new ObservableRxInvokerImpl(syncInvoker, executorService);
+    }
+
+    @Override
+    public boolean isProviderFor(Class<?> rxCls) {
+        return ObservableRxInvoker.class == rxCls;
+    }
+}
diff --git 
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
new file mode 100644
index 0000000..bf07eec
--- /dev/null
+++ 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOCustomizer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.server;
+
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.ext.AbstractStreamingResponseExtension;
+import org.apache.cxf.service.invoker.Invoker;
+
+public class ReactiveIOCustomizer extends AbstractStreamingResponseExtension {
+    @Override
+    protected Invoker createInvoker(JAXRSServerFactoryBean bean) {
+        Boolean useStreamingSubscriber = (Boolean)bean.getProperties(true)
+                .getOrDefault("useStreamingSubscriber", null);
+        ReactiveIOInvoker invoker = new ReactiveIOInvoker();
+        if (useStreamingSubscriber != null) {
+            
invoker.setUseStreamingSubscriberIfPossible(useStreamingSubscriber);
+        }
+        return invoker;
+    }
+}
diff --git 
a/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
new file mode 100644
index 0000000..6739092
--- /dev/null
+++ 
b/rt/rs/extensions/rx3/src/main/java/org/apache/cxf/jaxrs/rx3/server/ReactiveIOInvoker.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.jaxrs.rx3.server;
+
+import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
+import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker;
+import org.apache.cxf.message.Message;
+
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.disposables.Disposable;
+
+public class ReactiveIOInvoker extends AbstractReactiveInvoker {
+    protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object 
result) {
+        if (result instanceof Flowable) {
+            return handleFlowable(inMessage, (Flowable<?>)result);
+        } else if (result instanceof Single) {
+            return handleSingle(inMessage, (Single<?>)result);
+        } else if (result instanceof Observable) {
+            return handleObservable(inMessage, (Observable<?>)result);
+        }
+        return null;
+    }
+    
+    protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> 
single) {
+        final AsyncResponseImpl asyncResponse = new 
AsyncResponseImpl(inMessage);
+        Disposable d = single.subscribe(asyncResponse::resume, t -> 
handleThrowable(asyncResponse, t));
+        if (d == null) {
+            throw new IllegalStateException("Subscribe did not return a 
Disposable");
+        }
+        return asyncResponse;
+    }
+
+    protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> 
f) {
+        final AsyncResponseImpl asyncResponse = new 
AsyncResponseImpl(inMessage);
+        if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) {
+            Disposable d = f.subscribe(asyncResponse::resume, t -> 
handleThrowable(asyncResponse, t));
+            if (d == null) {
+                throw new IllegalStateException("Subscribe did not return a 
Disposable");
+            }
+        }
+        return asyncResponse;
+    }
+    
+    protected AsyncResponseImpl handleObservable(Message inMessage, 
Observable<?> obs) {
+        final AsyncResponseImpl asyncResponse = new 
AsyncResponseImpl(inMessage);
+        Disposable d = obs.subscribe(v -> asyncResponse.resume(v), t -> 
handleThrowable(asyncResponse, t));
+        if (d == null) {
+            throw new IllegalStateException("Subscribe did not return a 
Disposable");
+        }
+        return asyncResponse;
+    }
+
+}
diff --git 
a/rt/rs/extensions/rx3/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
 
b/rt/rs/extensions/rx3/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
new file mode 100644
index 0000000..b6f1f28
--- /dev/null
+++ 
b/rt/rs/extensions/rx3/src/main/resources/META-INF/services/org.apache.cxf.jaxrs.ext.JAXRSServerFactoryCustomizationExtension
@@ -0,0 +1 @@
+org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer
\ No newline at end of file
diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml
index 2b8bfd8..9f47638 100644
--- a/rt/rs/pom.xml
+++ b/rt/rs/pom.xml
@@ -39,6 +39,7 @@
         <module>extensions/search</module>
         <module>extensions/rx</module>
         <module>extensions/rx2</module>
+        <module>extensions/rx3</module>
         <module>extensions/reactor</module>
         <module>extensions/reactivestreams</module>
         <module>security</module>
diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml
index c968b06..fd89c66 100644
--- a/systests/jaxrs/pom.xml
+++ b/systests/jaxrs/pom.xml
@@ -60,10 +60,17 @@
         <dependency>
           <groupId>io.reactivex</groupId>
           <artifactId>rxjava</artifactId>
+          <scope>test</scope>
         </dependency>
         <dependency>
           <groupId>io.reactivex.rxjava2</groupId>
           <artifactId>rxjava</artifactId>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>io.reactivex.rxjava3</groupId>
+          <artifactId>rxjava</artifactId>
+          <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
@@ -332,6 +339,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-rs-extension-rx3</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-rt-rs-extension-reactor</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java
new file mode 100644
index 0000000..9df86bc
--- /dev/null
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3FlowableTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.core.GenericType;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvoker;
+import org.apache.cxf.jaxrs.rx3.client.FlowableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.subscribers.TestSubscriber;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class JAXRSRxJava3FlowableTest extends AbstractBusClientServerTestBase {
+    public static final String PORT = RxJava3FlowableServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly",
+                   launchServer(RxJava3FlowableServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldAsyncText() throws Exception {
+        String address = "http://localhost:"; + PORT + 
"/rx3/flowable/textAsync";
+        WebClient wc = WebClient.create(address);
+        String text = wc.accept("text/plain").get(String.class);
+        assertEquals("Hello, world!", text);
+    }
+
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:"; + PORT + "/rx3/flowable/textJson";
+        List<Object> providers = new LinkedList<>();
+        providers.add(new JacksonJsonProvider());
+        providers.add(new FlowableRxInvokerProvider());
+        WebClient wc = WebClient.create(address, providers);
+        Flowable<HelloWorldBean> obs = wc.accept("application/json")
+            .rx(FlowableRxInvoker.class)
+            .get(HelloWorldBean.class);
+
+        final TestSubscriber<HelloWorldBean> subscriber = new 
TestSubscriber<>();
+        obs.subscribe(subscriber);
+
+        subscriber.await(3, TimeUnit.SECONDS);
+        subscriber.assertResult(new HelloWorldBean("Hello", "World"));
+    }
+
+    @Test
+    public void testGetHelloWorldJsonImplicitListAsync() throws Exception {
+        String address = "http://localhost:"; + PORT + 
"/rx3/flowable/textJsonImplicitListAsync";
+        doTestGetHelloWorldJsonList(address);
+    }
+    @Test
+    public void testGetHelloWorldJsonImplicitListAsyncStream() throws 
Exception {
+        String address = "http://localhost:"; + PORT + 
"/rx3/flowable/textJsonImplicitListAsyncStream";
+        doTestGetHelloWorldJsonList(address);
+    }
+    @Test
+    public void testGetHelloWorldJsonImplicitList() throws Exception {
+        String address = "http://localhost:"; + PORT + 
"/rx33/flowable/textJsonImplicitList";
+        doTestGetHelloWorldJsonList(address);
+    }
+    private void doTestGetHelloWorldJsonList(String address) throws Exception {
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new 
JacksonJsonProvider()));
+        
WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+        GenericType<List<HelloWorldBean>> genericResponseType = new 
GenericType<List<HelloWorldBean>>() {
+        };
+
+        List<HelloWorldBean> beans = 
wc.accept("application/json").get(genericResponseType);
+        assertEquals(2, beans.size());
+        assertEquals("Hello", beans.get(0).getGreeting());
+        assertEquals("World", beans.get(0).getAudience());
+        assertEquals("Ciao", beans.get(1).getGreeting());
+        assertEquals("World", beans.get(1).getAudience());
+    }
+
+    @Test
+    public void testGetHelloWorldJsonSingle() throws Exception {
+        String address = "http://localhost:"; + PORT + 
"/rx33/flowable/textJsonSingle";
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new 
JacksonJsonProvider()));
+
+        HelloWorldBean bean = 
wc.accept("application/json").get(HelloWorldBean.class);
+        assertEquals("Hello", bean.getGreeting());
+        assertEquals("World", bean.getAudience());
+    }
+
+    @Test
+    public void testGetHelloWorldAsyncObservable() throws Exception {
+        String address = "http://localhost:"; + PORT + 
"/rx3/flowable/textAsync";
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new 
FlowableRxInvokerProvider()));
+        Flowable<String> obs = wc.accept("text/plain")
+            .rx(FlowableRxInvoker.class)
+            .get(String.class);
+
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        obs.map(s -> s + s).subscribe(subscriber);
+        
+        subscriber.await(2, TimeUnit.SECONDS);
+        subscriber.assertResult("Hello, world!Hello, world!");
+    }
+    
+    @Test
+    public void testGetHelloWorldAsyncObservable404() throws Exception {
+        String address = "http://localhost:"; + PORT + 
"/rx3/flowable/textAsync404";
+        Invocation.Builder b = ClientBuilder.newClient().register(new 
FlowableRxInvokerProvider())
+            .target(address).request();
+
+        final TestSubscriber<String> subscriber = new TestSubscriber<>();
+        b.rx(FlowableRxInvoker.class)
+            .get(String.class)
+            .subscribe(subscriber);
+        
+        subscriber.await(1, TimeUnit.SECONDS);
+        subscriber.assertError(NotFoundException.class);
+    }
+}
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java
new file mode 100644
index 0000000..6b66577
--- /dev/null
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava3ObservableTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.core.GenericType;
+import javax.xml.ws.Holder;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.jaxrs.rx3.client.ObservableRxInvoker;
+import org.apache.cxf.jaxrs.rx3.client.ObservableRxInvokerProvider;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.disposables.Disposable;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class JAXRSRxJava3ObservableTest extends 
AbstractBusClientServerTestBase {
+    public static final String PORT = RxJava3ObservableServer.PORT;
+    @BeforeClass
+    public static void startServers() throws Exception {
+        AbstractResourceInfo.clearAllMaps();
+        assertTrue("server did not launch correctly",
+                   launchServer(RxJava3ObservableServer.class, true));
+        createStaticBus();
+    }
+    @Test
+    public void testGetHelloWorldText() throws Exception {
+        String address = "http://localhost:"; + PORT + "/rx3/observable/text";
+        WebClient wc = WebClient.create(address);
+        String text = wc.accept("text/plain").get(String.class);
+        assertEquals("Hello, world!", text);
+    }
+
+    @Test
+    public void testGetHelloWorldJson() throws Exception {
+        String address = "http://localhost:"; + PORT + 
"/rx3/observable/textJson";
+        List<Object> providers = new LinkedList<>();
+        providers.add(new JacksonJsonProvider());
+        providers.add(new ObservableRxInvokerProvider());
+        WebClient wc = WebClient.create(address, providers);
+        Observable<HelloWorldBean> obs = wc.accept("application/json")
+            .rx(ObservableRxInvoker.class)
+            .get(HelloWorldBean.class);
+
+        Holder<HelloWorldBean> holder = new Holder<>();
+        Disposable d = obs.subscribe(v -> {
+            holder.value = v;
+        });
+        if (d == null) {
+            throw new IllegalStateException("Subscribe did not return a 
Disposable");
+        }
+        Thread.sleep(2000);
+        assertEquals("Hello", holder.value.getGreeting());
+        assertEquals("World", holder.value.getAudience());
+    }
+
+    @Test
+    public void testGetHelloWorldJsonList() throws Exception {
+        String address = "http://localhost:"; + PORT + 
"/rx3/observable/textJsonList";
+        doTestGetHelloWorldJsonList(address);
+    }
+
+    private void doTestGetHelloWorldJsonList(String address) throws Exception {
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new 
JacksonJsonProvider()));
+        
WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000);
+        GenericType<List<HelloWorldBean>> genericResponseType = new 
GenericType<List<HelloWorldBean>>() {
+        };
+
+        List<HelloWorldBean> beans = 
wc.accept("application/json").get(genericResponseType);
+        assertEquals(2, beans.size());
+        assertEquals("Hello", beans.get(0).getGreeting());
+        assertEquals("World", beans.get(0).getAudience());
+        assertEquals("Ciao", beans.get(1).getGreeting());
+        assertEquals("World", beans.get(1).getAudience());
+    }
+}
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java
new file mode 100644
index 0000000..f78d918
--- /dev/null
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableServer.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.rx3.server.ReactiveIOCustomizer;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava3FlowableServer extends AbstractBusTestServerBase {
+    public static final String PORT = 
allocatePort(RxJava3FlowableServer.class);
+
+    org.apache.cxf.endpoint.Server server;
+    org.apache.cxf.endpoint.Server server2;
+    public RxJava3FlowableServer() {
+    }
+
+    protected void run() {
+        Bus bus = BusFactory.getDefaultBus();
+        // Make sure default JSONProvider is not loaded
+        bus.setProperty("skip.default.json.provider.registration", true);
+        server = createFactoryBean(bus, false, "/rx3").create();
+        server = createFactoryBean(bus, true, "/rx33").create();
+    }
+
+    private JAXRSServerFactoryBean createFactoryBean(Bus bus, boolean 
useStreamingSubscriber,
+                                                     String relAddress) {
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.getProperties(true).put("useStreamingSubscriber", 
useStreamingSubscriber);
+        sf.setProvider(new JacksonJsonProvider());
+        new ReactiveIOCustomizer().customize(sf);
+        sf.getOutInterceptors().add(new LoggingOutInterceptor());
+        sf.setResourceClasses(RxJava3FlowableService.class);
+        sf.setResourceProvider(RxJava3FlowableService.class,
+                               new SingletonResourceProvider(new 
RxJava3FlowableService(), true));
+        sf.setAddress("http://localhost:"; + PORT + relAddress);
+        return sf;
+    }
+
+    public void tearDown() throws Exception {
+        server.stop();
+        server.destroy();
+        server = null;
+    }
+
+    public static void main(String[] args) {
+        try {
+            RxJava3FlowableServer s = new RxJava3FlowableServer();
+            s.start();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            System.exit(-1);
+        } finally {
+            System.out.println("done!");
+        }
+    }
+
+}
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java
new file mode 100644
index 0000000..af67f50
--- /dev/null
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3FlowableService.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+
+import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber;
+import 
org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
+
+import io.reactivex.rxjava3.core.BackpressureStrategy;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.core.Single;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+
+
+@Path("/flowable")
+public class RxJava3FlowableService {
+
+    
+    @GET
+    @Produces("application/json")
+    @Path("textJson")
+    public Flowable<HelloWorldBean> getJson() {
+        return Flowable.just(new HelloWorldBean());
+    }
+    
+    @GET
+    @Produces("application/json")
+    @Path("textJsonImplicitListAsync")
+    public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) {
+        final HelloWorldBean bean1 = new HelloWorldBean();
+        final HelloWorldBean bean2 = new HelloWorldBean("Ciao");
+        new Thread(new Runnable() {
+            public void run() {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException ex) {
+                    // ignore
+                }
+                Flowable.just(bean1, bean2).subscribe(new 
ListAsyncSubscriber<HelloWorldBean>(ar));
+            }
+        }).start();
+
+    }
+    @GET
+    @Produces("application/json")
+    @Path("textJsonImplicitListAsyncStream")
+    public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) 
{
+        Flowable.just("Hello", "Ciao")
+            .map(HelloWorldBean::new)
+            .subscribeOn(Schedulers.computation())
+            .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar));
+    }
+    
+    @GET
+    @Produces("application/json")
+    @Path("textJsonImplicitList")
+    public Flowable<HelloWorldBean> getJsonImplicitList() {
+        return Flowable.create(subscriber -> {
+            Thread t = new Thread(() -> {
+                subscriber.onNext(new HelloWorldBean("Hello"));
+                sleep();
+                subscriber.onNext(new HelloWorldBean("Ciao"));
+                sleep();
+                subscriber.onComplete();
+            });
+            t.start();
+        }, BackpressureStrategy.MISSING);
+    }
+    
+    @GET
+    @Produces("application/json")
+    @Path("textJsonSingle")
+    public Single<HelloWorldBean> getJsonSingle() {
+        CompletableFuture<HelloWorldBean> completableFuture = CompletableFuture
+            .supplyAsync(() -> {
+                sleep();
+                return new HelloWorldBean("Hello");
+            });
+        return Single.fromFuture(completableFuture);
+    }
+    
+    private static void sleep() {
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ex) {
+            // ignore
+        }
+    }
+    
+    @GET
+    @Produces("text/plain")
+    @Path("textAsync")
+    public void getTextAsync(@Suspended final AsyncResponse ar) {
+        Flowable.just("Hello, ").map(s -> s + "world!")
+            .subscribe(new StringAsyncSubscriber(ar));
+
+    }
+    
+    private static class StringAsyncSubscriber extends 
AbstractSubscriber<String> {
+
+        private StringBuilder sb = new StringBuilder();
+        StringAsyncSubscriber(AsyncResponse ar) {
+            super(ar);
+        }
+        @Override
+        public void onComplete() {
+            super.resume(sb.toString());
+        }
+
+        @Override
+        public void onNext(String s) {
+            sb.append(s);
+            super.requestNext();
+        }
+
+    }
+    
+    private static class ListAsyncSubscriber<T> extends AbstractSubscriber<T> {
+
+        private List<T> beans = new LinkedList<>();
+        ListAsyncSubscriber(AsyncResponse ar) {
+            super(ar);
+        }
+        @Override
+        public void onComplete() {
+            super.resume(beans);
+        }
+
+        @Override
+        public void onNext(T bean) {
+            beans.add(bean);
+            super.requestNext();
+        }
+
+    }
+}
+
+
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
new file mode 100644
index 0000000..0a8fcaa
--- /dev/null
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableServer.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.ext.logging.LoggingOutInterceptor;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.rx3.server.ReactiveIOInvoker;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+
+public class RxJava3ObservableServer extends AbstractBusTestServerBase {
+    public static final String PORT = 
allocatePort(RxJava3ObservableServer.class);
+
+    org.apache.cxf.endpoint.Server server;
+    public RxJava3ObservableServer() {
+    }
+
+    protected void run() {
+        Bus bus = BusFactory.getDefaultBus();
+        // Make sure default JSONProvider is not loaded
+        bus.setProperty("skip.default.json.provider.registration", true);
+        JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+        sf.setInvoker(new ReactiveIOInvoker());
+        sf.setProvider(new JacksonJsonProvider());
+        sf.getOutInterceptors().add(new LoggingOutInterceptor());
+        sf.setResourceClasses(RxJava3ObservableService.class);
+        sf.setResourceProvider(RxJava3ObservableService.class,
+                               new SingletonResourceProvider(new 
RxJava3ObservableService(), true));
+        sf.setAddress("http://localhost:"; + PORT + "/");
+        server = sf.create();
+    }
+
+    public void tearDown() throws Exception {
+        server.stop();
+        server.destroy();
+        server = null;
+    }
+
+    public static void main(String[] args) {
+        try {
+            RxJava3ObservableServer s = new RxJava3ObservableServer();
+            s.start();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            System.exit(-1);
+        } finally {
+            System.out.println("done!");
+        }
+    }
+
+}
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
new file mode 100644
index 0000000..51d362c
--- /dev/null
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava3ObservableService.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs.reactive;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import io.reactivex.rxjava3.core.Observable;
+
+@Path("/rx3/observable")
+public class RxJava3ObservableService {
+
+    @GET
+    @Produces("text/plain")
+    @Path("text")
+    public Observable<String> getText() {
+        return Observable.just("Hello, world!");
+    }
+    
+    @GET
+    @Produces("application/json")
+    @Path("textJson")
+    public Observable<HelloWorldBean> getJson() {
+        return Observable.just(new HelloWorldBean());
+    }
+    
+    @GET
+    @Produces("application/json")
+    @Path("textJsonList")
+    public Observable<List<HelloWorldBean>> getJsonList() {
+        HelloWorldBean bean1 = new HelloWorldBean();
+        HelloWorldBean bean2 = new HelloWorldBean();
+        bean2.setGreeting("Ciao");
+        return Observable.just(Arrays.asList(bean1, bean2));
+    }
+  
+}

Reply via email to