Repository: cxf Updated Branches: refs/heads/master b114d6cba -> 1224d5cc9
[CXF-6833] Closer integration with AsyncResponse Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/1224d5cc Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/1224d5cc Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/1224d5cc Branch: refs/heads/master Commit: 1224d5cc9f646aa3a4924ce9cb045366d2f4cdff Parents: b114d6c Author: Sergey Beryozkin <[email protected]> Authored: Wed Aug 31 16:52:59 2016 +0100 Committer: Sergey Beryozkin <[email protected]> Committed: Wed Aug 31 16:52:59 2016 +0100 ---------------------------------------------------------------------- .../provider/StreamingResponseProvider.java | 4 +- .../provider/rx/AbstractAsyncSubscriber.java | 12 ++ .../rx/JsonStreamingAsyncSubscriber.java | 31 +++++ .../jaxrs/provider/rx/ListAsyncSubscriber.java | 42 +++++++ .../provider/rx/StreamingAsyncSubscriber.java | 125 +++++++++++++++++++ .../systest/jaxrs/reactive/HelloWorldBean.java | 9 +- .../jaxrs/reactive/JAXRSReactiveTest.java | 10 ++ .../systest/jaxrs/reactive/ReactiveServer.java | 7 ++ .../systest/jaxrs/reactive/ReactiveService.java | 33 ++++- 9 files changed, 268 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java index 2749d7c..e33728d 100644 --- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java @@ -34,8 +34,8 @@ import javax.ws.rs.ext.Providers; import org.apache.cxf.jaxrs.ext.StreamingResponse; import org.apache.cxf.jaxrs.utils.InjectionUtils; -public class StreamingResponseProvider<T> implements - MessageBodyWriter<StreamingResponse<T>> { +public class StreamingResponseProvider<T> extends AbstractConfigurableProvider + implements MessageBodyWriter<StreamingResponse<T>> { @Context private Providers providers; http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java index ae4459c..c49144f 100644 --- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java +++ b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java @@ -18,8 +18,12 @@ */ package org.apache.cxf.jaxrs.provider.rx; +import java.util.List; + import javax.ws.rs.container.AsyncResponse; +import org.apache.cxf.jaxrs.ext.StreamingResponse; + import rx.Subscriber; public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> { @@ -32,6 +36,14 @@ public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> { public void resume(T response) { ar.resume(response); } + + public void resume(List<T> response) { + ar.resume(response); + } + + public void resume(StreamingResponse<T> response) { + ar.resume(response); + } @Override public void onError(Throwable t) { http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java new file mode 100644 index 0000000..5e36e9c --- /dev/null +++ b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/JsonStreamingAsyncSubscriber.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.provider.rx; + +import javax.ws.rs.container.AsyncResponse; + +public class JsonStreamingAsyncSubscriber<T> extends StreamingAsyncSubscriber<T> { + public JsonStreamingAsyncSubscriber(AsyncResponse ar) { + this(ar, 1000); + } + public JsonStreamingAsyncSubscriber(AsyncResponse ar, long pollTimeout) { + super(ar, "[", "]", ",", pollTimeout); + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java new file mode 100644 index 0000000..6bfb1cb --- /dev/null +++ b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ListAsyncSubscriber.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.provider.rx; + +import java.util.LinkedList; +import java.util.List; + +import javax.ws.rs.container.AsyncResponse; + +public class ListAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> { + + private List<T> beans = new LinkedList<T>(); + public ListAsyncSubscriber(AsyncResponse ar) { + super(ar); + } + @Override + public void onCompleted() { + super.resume(beans); + } + + @Override + public void onNext(T bean) { + beans.add(bean); + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java new file mode 100644 index 0000000..48a5ac4 --- /dev/null +++ b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/StreamingAsyncSubscriber.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.provider.rx; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.TimeoutHandler; + +import org.apache.commons.codec.binary.StringUtils; +import org.apache.cxf.jaxrs.ext.StreamingResponse; + +public class StreamingAsyncSubscriber<T> extends AbstractAsyncSubscriber<T> { + + private BlockingQueue<T> queue = new LinkedBlockingQueue<T>(); + private String openTag; + private String closeTag; + private String separator; + private long pollTimeout; + private long asyncTimeout; + private volatile boolean completed; + private volatile boolean firstWriteDone; + public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep) { + this(ar, openTag, closeTag, "", 1000); + } + public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, + long pollTimeout) { + this(ar, openTag, closeTag, sep, pollTimeout, 0); + } + public StreamingAsyncSubscriber(AsyncResponse ar, String openTag, String closeTag, String sep, + long pollTimeout, long asyncTimeout) { + super(ar); + this.openTag = openTag; + this.closeTag = closeTag; + this.separator = sep; + this.pollTimeout = pollTimeout; + this.asyncTimeout = 0; + if (asyncTimeout > 0) { + ar.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS); + ar.setTimeoutHandler(new TimeoutHandlerImpl()); + } + } + @Override + public void onStart() { + if (asyncTimeout == 0) { + resumeAsyncResponse(); + } + } + private void resumeAsyncResponse() { + super.resume(new StreamingResponseImpl()); + } + @Override + public void onCompleted() { + completed = true; + } + + @Override + public void onNext(T bean) { + if (asyncTimeout > 0 && getAsyncResponse().isSuspended()) { + resumeAsyncResponse(); + } + queue.add(bean); + } + private class StreamingResponseImpl implements StreamingResponse<T> { + + @Override + public void writeTo(Writer<T> writer) throws IOException { + if (openTag != null) { + writer.getEntityStream().write(StringUtils.getBytesUtf8(openTag)); + } + while (!completed || queue.size() > 0) { + try { + T bean = queue.poll(pollTimeout, TimeUnit.MILLISECONDS); + if (bean != null) { + if (firstWriteDone) { + writer.getEntityStream().write(StringUtils.getBytesUtf8(separator)); + } + writer.write(bean); + firstWriteDone = true; + + } + } catch (InterruptedException ex) { + // ignore + } + } + if (closeTag != null) { + writer.getEntityStream().write(StringUtils.getBytesUtf8(closeTag)); + } + + } + + } + public class TimeoutHandlerImpl implements TimeoutHandler { + + @Override + public void handleTimeout(AsyncResponse asyncResponse) { + if (queue.isEmpty()) { + asyncResponse.setTimeout(asyncTimeout, TimeUnit.MILLISECONDS); + } else { + resumeAsyncResponse(); + } + + } + + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/HelloWorldBean.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/HelloWorldBean.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/HelloWorldBean.java index c73a63d..79dc6f2 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/HelloWorldBean.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/HelloWorldBean.java @@ -19,8 +19,15 @@ package org.apache.cxf.systest.jaxrs.reactive; public class HelloWorldBean { - private String greeting = "Hello"; + private String greeting; private String audience = "World"; + public HelloWorldBean() { + this("Hello"); + } + public HelloWorldBean(String greeting) { + this.greeting = greeting; + } + public String getGreeting() { return greeting; } http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java index f82d139..45a9468 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java @@ -104,6 +104,16 @@ public class JAXRSReactiveTest extends AbstractBusClientServerTestBase { String address = "http://localhost:" + PORT + "/reactive/textJsonImplicitList"; doTestGetHelloWorldJsonList(address); } + @Test + public void testGetHelloWorldJsonImplicitListAsync() throws Exception { + String address = "http://localhost:" + PORT + "/reactive/textJsonImplicitListAsync"; + doTestGetHelloWorldJsonList(address); + } + @Test + public void testGetHelloWorldJsonImplicitListAsyncStream() throws Exception { + String address = "http://localhost:" + PORT + "/reactive/textJsonImplicitListAsyncStream"; + doTestGetHelloWorldJsonList(address); + } private void doTestGetHelloWorldJsonList(String address) throws Exception { WebClient wc = WebClient.create(address, Collections.singletonList(new JacksonJsonProvider())); http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java index ced133b..d12641a 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java @@ -19,6 +19,8 @@ package org.apache.cxf.systest.jaxrs.reactive; +import java.util.Collections; + import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import org.apache.cxf.Bus; @@ -26,8 +28,10 @@ import org.apache.cxf.BusFactory; import org.apache.cxf.interceptor.LoggingOutInterceptor; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; +import org.apache.cxf.jaxrs.provider.StreamingResponseProvider; import org.apache.cxf.jaxrs.provider.rx.ObservableWriter; import org.apache.cxf.testutil.common.AbstractBusTestServerBase; + public class ReactiveServer extends AbstractBusTestServerBase { public static final String PORT = allocatePort(ReactiveServer.class); @@ -43,6 +47,9 @@ public class ReactiveServer extends AbstractBusTestServerBase { JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); sf.setProvider(new ObservableWriter<Object>()); sf.setProvider(new JacksonJsonProvider()); + StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>(); + streamProvider.setProduceMediaTypes(Collections.singletonList("application/json")); + sf.setProvider(streamProvider); sf.getOutInterceptors().add(new LoggingOutInterceptor()); sf.setResourceClasses(ReactiveService.class); sf.setResourceProvider(ReactiveService.class, http://git-wip-us.apache.org/repos/asf/cxf/blob/1224d5cc/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java index 5d77969..6081f2b 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java @@ -30,8 +30,11 @@ import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import org.apache.cxf.jaxrs.provider.rx.AbstractAsyncSubscriber; +import org.apache.cxf.jaxrs.provider.rx.JsonStreamingAsyncSubscriber; +import org.apache.cxf.jaxrs.provider.rx.ListAsyncSubscriber; import rx.Observable; +import rx.schedulers.Schedulers; @Path("/reactive") @@ -65,12 +68,38 @@ public class ReactiveService { @Path("textJsonImplicitList") public Observable<HelloWorldBean> getJsonImplicitList() { HelloWorldBean bean1 = new HelloWorldBean(); - HelloWorldBean bean2 = new HelloWorldBean(); - bean2.setGreeting("Ciao"); + HelloWorldBean bean2 = new HelloWorldBean("Ciao"); return Observable.just(bean1, bean2); } @GET @Produces("application/json") + @Path("textJsonImplicitListAsync") + public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) { + final HelloWorldBean bean1 = new HelloWorldBean(); + final HelloWorldBean bean2 = new HelloWorldBean("Ciao"); + new Thread(new Runnable() { + public void run() { + try { + Thread.sleep(2000); + } catch (InterruptedException ex) { + // ignore + } + Observable.just(bean1, bean2).subscribe(new ListAsyncSubscriber<HelloWorldBean>(ar)); + } + }).start(); + + } + @GET + @Produces("application/json") + @Path("textJsonImplicitListAsyncStream") + public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) { + Observable.just("Hello", "Ciao") + .map(s -> new HelloWorldBean(s)) + .subscribeOn(Schedulers.computation()) + .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar)); + } + @GET + @Produces("application/json") @Path("textJsonList") public Observable<List<HelloWorldBean>> getJsonList() { HelloWorldBean bean1 = new HelloWorldBean();
