Repository: cxf Updated Branches: refs/heads/master 8e753ad9d -> 502db47a7
[CXF-6833] Better support for implicit lists, simpler async subscription code Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/502db47a Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/502db47a Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/502db47a Branch: refs/heads/master Commit: 502db47a7c520767da2977376be5cf2fce3f56af Parents: 8e753ad Author: Sergey Beryozkin <[email protected]> Authored: Tue Aug 30 18:02:32 2016 +0100 Committer: Sergey Beryozkin <[email protected]> Committed: Tue Aug 30 18:02:32 2016 +0100 ---------------------------------------------------------------------- .../provider/rx/AbstractAsyncSubscriber.java | 44 ++++++++++++++++++ .../cxf/jaxrs/provider/rx/ObservableWriter.java | 47 +++++++++++++++++--- .../jaxrs/reactive/JAXRSReactiveTest.java | 36 +++++++++++++-- .../systest/jaxrs/reactive/ReactiveServer.java | 2 + .../systest/jaxrs/reactive/ReactiveService.java | 44 ++++++++++++------ 5 files changed, 149 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java new file mode 100644 index 0000000..ae4459c --- /dev/null +++ b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/AbstractAsyncSubscriber.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.provider.rx; + +import javax.ws.rs.container.AsyncResponse; + +import rx.Subscriber; + +public abstract class AbstractAsyncSubscriber<T> extends Subscriber<T> { + + private AsyncResponse ar; + + protected AbstractAsyncSubscriber(AsyncResponse ar) { + this.ar = ar; + } + public void resume(T response) { + ar.resume(response); + } + + @Override + public void onError(Throwable t) { + ar.resume(t); + } + + protected AsyncResponse getAsyncResponse() { + return ar; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java index 929709b..6317506 100644 --- a/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java +++ b/rt/rs/extensions/providers/src/main/java/org/apache/cxf/jaxrs/provider/rx/ObservableWriter.java @@ -22,22 +22,28 @@ import java.io.IOException; import java.io.OutputStream; import java.lang.annotation.Annotation; import java.lang.reflect.Type; +import java.util.LinkedList; +import java.util.List; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.ext.Provider; import javax.ws.rs.ext.Providers; import org.apache.cxf.jaxrs.utils.ExceptionUtils; +import org.apache.cxf.jaxrs.utils.ParameterizedCollectionType; import rx.Observable; +@Provider public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> { @Context private Providers providers; + private boolean writeSingleElementAsList; @Override public long getSize(Observable<T> arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4) { @@ -54,24 +60,49 @@ public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> { public void writeTo(Observable<T> obs, Class<?> cls, Type t, Annotation[] anns, MediaType mt, MultivaluedMap<String, Object> headers, OutputStream os) throws IOException, WebApplicationException { - obs.subscribe(value -> writeToOutputStream(value, anns, mt, headers, os), - throwable -> throwError(throwable)); + List<T> entities = new LinkedList<T>(); + obs.subscribe(value -> entities.add(value), + throwable -> throwError(throwable)); + if (!entities.isEmpty()) { + + if (entities.get(0) instanceof List) { + List<T> allEntities = new LinkedList<T>(); + for (T obj : entities) { + @SuppressWarnings("unchecked") + List<T> listT = (List<T>)obj; + allEntities.addAll(listT); + } + writeToOutputStream(allEntities, anns, mt, headers, os); + } else if (entities.size() > 1 || writeSingleElementAsList) { + writeToOutputStream(entities, anns, mt, headers, os); + } else { + writeToOutputStream(entities.get(0), anns, mt, headers, os); + } + } } - private void writeToOutputStream(T value, + private void writeToOutputStream(Object value, Annotation[] anns, MediaType mt, MultivaluedMap<String, Object> headers, OutputStream os) { + Class<?> valueCls = value.getClass(); + Type valueType = null; + if (value instanceof List) { + List<?> list = (List<?>)value; + valueType = new ParameterizedCollectionType(list.isEmpty() ? Object.class : list.get(0).getClass()); + } else { + valueType = valueCls; + } @SuppressWarnings("unchecked") - MessageBodyWriter<T> writer = - (MessageBodyWriter<T>)providers.getMessageBodyWriter(value.getClass(), value.getClass(), anns, mt); + MessageBodyWriter<Object> writer = + (MessageBodyWriter<Object>)providers.getMessageBodyWriter(valueCls, valueType, anns, mt); if (writer == null) { throwError(null); } try { - writer.writeTo(value, value.getClass(), value.getClass(), anns, mt, headers, os); + writer.writeTo(value, valueCls, valueType, anns, mt, headers, os); } catch (IOException ex) { throwError(ex); } @@ -81,4 +112,8 @@ public class ObservableWriter<T> implements MessageBodyWriter<Observable<T>> { throw ExceptionUtils.toInternalServerErrorException(cause, null); } + public void setWriteSingleElementAsList(boolean writeSingleElementAsList) { + this.writeSingleElementAsList = writeSingleElementAsList; + } + } http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java index 535831d..f82d139 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSReactiveTest.java @@ -20,10 +20,13 @@ package org.apache.cxf.systest.jaxrs.reactive; import java.util.Collections; +import java.util.List; import java.util.concurrent.Future; import javax.ws.rs.core.GenericType; +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + import org.apache.cxf.jaxrs.client.WebClient; import org.apache.cxf.jaxrs.model.AbstractResourceInfo; import org.apache.cxf.jaxrs.provider.rx.ObservableReader; @@ -85,10 +88,35 @@ public class JAXRSReactiveTest extends AbstractBusClientServerTestBase { @Test public void testGetHelloWorldJson() throws Exception { String address = "http://localhost:" + PORT + "/reactive/textJson"; - WebClient wc = WebClient.create(address); - String text = wc.accept("application/json").get(String.class); - assertTrue("{\"audience\":\"World\",\"greeting\":\"Hello\"}".equals(text) - || "{\"greeting\":\"Hello\",\"audience\":\"World\"}".equals(text)); + WebClient wc = WebClient.create(address, + Collections.singletonList(new JacksonJsonProvider())); + HelloWorldBean bean = wc.accept("application/json").get(HelloWorldBean.class); + assertEquals("Hello", bean.getGreeting()); + assertEquals("World", bean.getAudience()); + } + @Test + public void testGetHelloWorldJsonList() throws Exception { + String address = "http://localhost:" + PORT + "/reactive/textJsonList"; + doTestGetHelloWorldJsonList(address); + } + @Test + public void testGetHelloWorldJsonImplicitList() throws Exception { + String address = "http://localhost:" + PORT + "/reactive/textJsonImplicitList"; + doTestGetHelloWorldJsonList(address); + } + private void doTestGetHelloWorldJsonList(String address) throws Exception { + WebClient wc = WebClient.create(address, + Collections.singletonList(new JacksonJsonProvider())); + WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000); + GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() { + }; + + List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType); + assertEquals(2, beans.size()); + assertEquals("Hello", beans.get(0).getGreeting()); + assertEquals("World", beans.get(0).getAudience()); + assertEquals("Ciao", beans.get(1).getGreeting()); + assertEquals("World", beans.get(1).getAudience()); } private Observable<String> getObservable(Future<String> future) { http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java index 09cbecc..ced133b 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveServer.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import org.apache.cxf.Bus; import org.apache.cxf.BusFactory; +import org.apache.cxf.interceptor.LoggingOutInterceptor; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; import org.apache.cxf.jaxrs.provider.rx.ObservableWriter; @@ -42,6 +43,7 @@ public class ReactiveServer extends AbstractBusTestServerBase { JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); sf.setProvider(new ObservableWriter<Object>()); sf.setProvider(new JacksonJsonProvider()); + sf.getOutInterceptors().add(new LoggingOutInterceptor()); sf.setResourceClasses(ReactiveService.class); sf.setResourceProvider(ReactiveService.class, new SingletonResourceProvider(new ReactiveService(), true)); http://git-wip-us.apache.org/repos/asf/cxf/blob/502db47a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java index f716bbc..5d77969 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ReactiveService.java @@ -20,14 +20,18 @@ package org.apache.cxf.systest.jaxrs.reactive; +import java.util.Arrays; +import java.util.List; + import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; +import org.apache.cxf.jaxrs.provider.rx.AbstractAsyncSubscriber; + import rx.Observable; -import rx.Subscriber; @Path("/reactive") @@ -45,33 +49,45 @@ public class ReactiveService { @Path("textAsync") public void getTextAsync(@Suspended final AsyncResponse ar) { Observable.just("Hello, ").map(s -> s + "world!") - .subscribe(new AsyncResponseSubscriber(ar)); + .subscribe(new StringAsyncSubscriber(ar)); } @GET @Produces("application/json") @Path("textJson") - public Observable<HelloWorldBean> getJsonText() { + public Observable<HelloWorldBean> getJson() { return Observable.just(new HelloWorldBean()); } - private class AsyncResponseSubscriber extends Subscriber<String> { + @GET + @Produces("application/json") + @Path("textJsonImplicitList") + public Observable<HelloWorldBean> getJsonImplicitList() { + HelloWorldBean bean1 = new HelloWorldBean(); + HelloWorldBean bean2 = new HelloWorldBean(); + bean2.setGreeting("Ciao"); + return Observable.just(bean1, bean2); + } + @GET + @Produces("application/json") + @Path("textJsonList") + public Observable<List<HelloWorldBean>> getJsonList() { + HelloWorldBean bean1 = new HelloWorldBean(); + HelloWorldBean bean2 = new HelloWorldBean(); + bean2.setGreeting("Ciao"); + return Observable.just(Arrays.asList(bean1, bean2)); + } + + private class StringAsyncSubscriber extends AbstractAsyncSubscriber<String> { private StringBuilder sb = new StringBuilder(); - private AsyncResponse ar; - - AsyncResponseSubscriber(AsyncResponse ar) { - this.ar = ar; + StringAsyncSubscriber(AsyncResponse ar) { + super(ar); } @Override public void onCompleted() { - ar.resume(sb.toString()); - } - - @Override - public void onError(Throwable arg0) { - // TODO Auto-generated method stub + super.resume(sb.toString()); } @Override
