This is an automated email from the ASF dual-hosted git repository. sergeyb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push: new da2970f [CXF-7556] Introducing cxf-rt-rs-extension-reactivestreams which uses JAX-RS and reactivestreams API only da2970f is described below commit da2970f221bacac5cc64997f0cad7bc1b319b7d3 Author: Sergey Beryozkin <sberyoz...@gmail.com> AuthorDate: Tue Nov 14 16:38:40 2017 +0000 [CXF-7556] Introducing cxf-rt-rs-extension-reactivestreams which uses JAX-RS and reactivestreams API only --- parent/pom.xml | 1 + rt/rs/{ => extensions/reactivestreams}/pom.xml | 40 +++---- .../server/AbstractSubscriber.java | 4 +- .../server/JsonStreamingAsyncSubscriber.java | 9 +- .../server/StreamingAsyncSubscriber.java | 2 +- .../jaxrs/reactor/server/AbstractSubscriber.java | 86 -------------- .../reactor/server/StreamingAsyncSubscriber.java | 126 --------------------- .../rx2/server/JsonStreamingAsyncSubscriber.java | 34 ------ rt/rs/pom.xml | 1 + systests/jaxrs/pom.xml | 6 + .../jaxrs/reactive/RxJava2FlowableService.java | 4 +- .../cxf/systest/jaxrs/reactor/FluxService.java | 4 +- .../cxf/systest/jaxrs/reactor/MonoService.java | 4 +- 13 files changed, 42 insertions(+), 279 deletions(-) diff --git a/parent/pom.xml b/parent/pom.xml index 7487b99..d65d100 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -142,6 +142,7 @@ <cxf.opensaml.osgi.version>3.3.0_2</cxf.opensaml.osgi.version> <cxf.opensaml.osgi.version.range>[3.1,4)</cxf.opensaml.osgi.version.range> <cxf.rhino.version>1.7R2</cxf.rhino.version> + <cxf.reactivestreams.version>1.0.1</cxf.reactivestreams.version> <cxf.reflections.bundle.version>0.9.10_3</cxf.reflections.bundle.version> <cxf.servlet-api.group>javax.servlet</cxf.servlet-api.group> <cxf.servlet-api.artifact>javax.servlet-api</cxf.servlet-api.artifact> diff --git a/rt/rs/pom.xml b/rt/rs/extensions/reactivestreams/pom.xml similarity index 60% copy from rt/rs/pom.xml copy to rt/rs/extensions/reactivestreams/pom.xml index 5a7b89a..1285746 100644 --- a/rt/rs/pom.xml +++ b/rt/rs/extensions/reactivestreams/pom.xml @@ -7,9 +7,9 @@ to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,27 +19,27 @@ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> - <artifactId>cxf-rt-rs</artifactId> - <packaging>pom</packaging> - <name>Apache CXF Rest Runtime</name> - <description>Apache CXF Rest Runtime</description> + <artifactId>cxf-rt-rs-extension-reactivestreams</artifactId> + <packaging>bundle</packaging> + <name>Apache CXF JAX-RS Extensions: Reactive Streams</name> + <description>Apache CXF JAX-RS Extensions: Reactive Streams</description> <url>http://cxf.apache.org</url> <parent> <groupId>org.apache.cxf</groupId> - <artifactId>cxf-rt</artifactId> + <artifactId>cxf-parent</artifactId> <version>3.2.2-SNAPSHOT</version> + <relativePath>../../../../parent/pom.xml</relativePath> </parent> - <modules> - <module>client</module> - <module>http-sci</module> - <module>description</module> - <module>description-swagger</module> - <module>extensions/json-basic</module> - <module>extensions/providers</module> - <module>extensions/search</module> - <module>extensions/rx</module> - <module>extensions/reactor</module> - <module>security</module> - <module>sse</module> - </modules> + <dependencies> + <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-frontend-jaxrs</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.reactivestreams</groupId> + <artifactId>reactive-streams</artifactId> + <version>${cxf.reactivestreams.version}</version> + </dependency> + </dependencies> </project> diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractSubscriber.java b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/AbstractSubscriber.java similarity index 97% rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractSubscriber.java rename to rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/AbstractSubscriber.java index 1010b3b..b7e4b59 100644 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/AbstractSubscriber.java +++ b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/AbstractSubscriber.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.cxf.jaxrs.rx2.server; +package org.apache.cxf.jaxrs.reactivestreams.server; import java.util.List; @@ -85,4 +85,4 @@ public abstract class AbstractSubscriber<T> implements Subscriber<T> { protected final void request(long elements) { this.subscription.request(elements); } -} \ No newline at end of file +} diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/JsonStreamingAsyncSubscriber.java similarity index 83% rename from rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/JsonStreamingAsyncSubscriber.java rename to rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/JsonStreamingAsyncSubscriber.java index a5aa780..c302008 100644 --- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/JsonStreamingAsyncSubscriber.java +++ b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/JsonStreamingAsyncSubscriber.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.cxf.jaxrs.reactor.server; +package org.apache.cxf.jaxrs.reactivestreams.server; import javax.ws.rs.container.AsyncResponse; @@ -28,10 +28,11 @@ public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> this(ar, pollTimeout, 0); } public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) { - this(ar, "[", "]", ",", pollTimeout, asyncTimeout); + super(ar, "[", "]", ",", pollTimeout, asyncTimeout); } + public JsonStreamingAsyncSubscriber(AsyncResponse ar, String prefix, String suffix, String separator, - long pollTimeout, long asyncTimeout) { - super(ar, prefix, suffix, separator, pollTimeout, asyncTimeout); + long pollTimeout, long asyncTimeout) { + super(ar, prefix, suffix, separator, pollTimeout, asyncTimeout); } } diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java similarity index 98% rename from rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java rename to rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java index 128f499..120c366 100644 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/StreamingAsyncSubscriber.java +++ b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.cxf.jaxrs.rx2.server; +package org.apache.cxf.jaxrs.reactivestreams.server; import java.io.IOException; import java.util.concurrent.BlockingQueue; diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/AbstractSubscriber.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/AbstractSubscriber.java deleted file mode 100644 index 18950bc..0000000 --- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/AbstractSubscriber.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.jaxrs.reactor.server; - -import java.util.List; -import javax.ws.rs.container.AsyncResponse; -import org.apache.cxf.jaxrs.ext.StreamingResponse; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -public abstract class AbstractSubscriber<T> implements Subscriber<T> { - - private AsyncResponse ar; - private Subscription subscription; - - protected AbstractSubscriber(AsyncResponse ar) { - this.ar = ar; - } - public void resume(T response) { - ar.resume(response); - } - - public void resume(List<T> response) { - ar.resume(response); - } - - public void resume(StreamingResponse<T> response) { - ar.resume(response); - } - - @Override - public void onError(Throwable t) { - ar.resume(t); - } - - @Override - public void onSubscribe(Subscription inSubscription) { - this.subscription = inSubscription; - requestAll(); - } - - @Override - public void onNext(T t) { - resume(t); - } - - @Override - public void onComplete() { - } - - protected AsyncResponse getAsyncResponse() { - return ar; - } - - protected Subscription getSubscription() { - return subscription; - } - - protected void requestNext() { - request(1); - } - - protected void requestAll() { - request(Long.MAX_VALUE); - } - - protected final void request(long elements) { - this.subscription.request(elements); - } -} \ No newline at end of file diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/StreamingAsyncSubscriber.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/StreamingAsyncSubscriber.java deleted file mode 100644 index 7b1efa1..0000000 --- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/StreamingAsyncSubscriber.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.jaxrs.reactor.server; - -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.container.TimeoutHandler; -import org.apache.cxf.common.util.StringUtils; -import org.apache.cxf.jaxrs.ext.StreamingResponse; -import org.reactivestreams.Subscription; - -public class StreamingAsyncSubscriber<T> extends AbstractSubscriber<T> { - private BlockingQueue<T> queue = new LinkedBlockingQueue<T>(); - private String openTag; - private String closeTag; - private String separator; - private long pollTimeout; - private long asyncTimeout; - private volatile boolean completed; - private AtomicBoolean firstWriteDone = new AtomicBoolean(); - public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) { - this(ar, openTag, closeTag, sep, 1000); - } - public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, - long pollTimeout) { - this(ar, openTag, closeTag, sep, pollTimeout, 0); - } - public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, - long pollTimeout, long asyncTimeout) { - super(ar); - this.openTag = openTag; - this.closeTag = closeTag; - this.separator = sep; - this.pollTimeout = pollTimeout; - this.asyncTimeout = 0; - if (asyncTimeout > 0) { - ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS); - ar.setTimeoutHandler(new TimeoutHandlerImpl()); - } - } - @Override - public void onSubscribe(Subscription subscription) { - if (asyncTimeout == 0) { - resumeAsyncResponse(); - } - super.onSubscribe(subscription); - } - @Override - public void onComplete() { - completed = true; - } - - @Override - public void onNext(T bean) { - if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) { - resumeAsyncResponse(); - } - queue.add(bean); - super.requestNext(); - } - protected void resumeAsyncResponse() { - resume(new StreamingResponseImpl()); - } - - private class StreamingResponseImpl implements StreamingResponse<T> { - - @Override - public void writeTo(Writer<T> writer) throws IOException { - if (openTag != null) { - writer.getEntityStream().write(StringUtils.toBytesUTF8(openTag)); - } - while (!completed || !queue.isEmpty()) { - try { - T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS); - if (bean != null) { - if (firstWriteDone.getAndSet(true)) { - writer.getEntityStream().write(StringUtils.toBytesUTF8(separator)); - } - writer.write(bean); - } - } catch (InterruptedException ex) { - // ignore - } - } - if (closeTag != null) { - writer.getEntityStream().write(StringUtils.toBytesUTF8(closeTag)); - } - - } - - } - public class TimeoutHandlerImpl implements TimeoutHandler { - - @Override - public void handleTimeout(AsyncResponse asyncResponse) { - if (queue.isEmpty()) { - asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS); - } else { - resumeAsyncResponse(); - } - - } - - } -} - diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/JsonStreamingAsyncSubscriber.java deleted file mode 100644 index c809799..0000000 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/JsonStreamingAsyncSubscriber.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.jaxrs.rx2.server; - -import javax.ws.rs.container.AsyncResponse; - -public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> { - public JsonStreamingAsyncSubscriber(AsyncResponse ar) { - this(ar, 1000); - } - public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) { - this(ar, pollTimeout, 0); - } - public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout, long asyncTimeout) { - super(ar, "[", "]", ",", pollTimeout, asyncTimeout); - } - -} diff --git a/rt/rs/pom.xml b/rt/rs/pom.xml index 5a7b89a..eebe14b 100644 --- a/rt/rs/pom.xml +++ b/rt/rs/pom.xml @@ -39,6 +39,7 @@ <module>extensions/search</module> <module>extensions/rx</module> <module>extensions/reactor</module> + <module>extensions/reactivestreams</module> <module>security</module> <module>sse</module> </modules> diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml index d8ea6e6..9619e39 100644 --- a/systests/jaxrs/pom.xml +++ b/systests/jaxrs/pom.xml @@ -70,6 +70,12 @@ <artifactId>reactor-core</artifactId> </dependency> <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-rs-extension-reactivestreams</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>javax.el</groupId> <artifactId>javax.el-api</artifactId> <version>${cxf.el.api.version}</version> diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java index 05b1dd0..a995edc 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java @@ -29,8 +29,8 @@ import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; -import org.apache.cxf.jaxrs.rx2.server.AbstractSubscriber; -import org.apache.cxf.jaxrs.rx2.server.JsonStreamingAsyncSubscriber; +import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber; +import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber; import io.reactivex.Flowable; import io.reactivex.schedulers.Schedulers; diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java index 3584636..79a6c7f 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java @@ -24,7 +24,7 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; -import org.apache.cxf.jaxrs.reactor.server.JsonStreamingAsyncSubscriber; +import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; @@ -47,4 +47,4 @@ public class FluxService { .subscribeOn(Schedulers.parallel()) .subscribe(new JsonStreamingAsyncSubscriber<>(ar)); } -} \ No newline at end of file +} diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java index ce50dca..8ded540 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java @@ -24,8 +24,8 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; -import org.apache.cxf.jaxrs.reactor.server.AbstractSubscriber; -import org.apache.cxf.jaxrs.reactor.server.JsonStreamingAsyncSubscriber; +import org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber; +import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -- To stop receiving notification emails like this one, please contact ['"commits@cxf.apache.org" <commits@cxf.apache.org>'].