Repository: cxf Updated Branches: refs/heads/master ed0ab4cb9 -> 3f1542f09
[CXF-5639] Introducing StreamingResponse, can be used with/without WebSocket Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/3f1542f0 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/3f1542f0 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/3f1542f0 Branch: refs/heads/master Commit: 3f1542f09664fc185546e24ffb4699233da0eec2 Parents: ed0ab4c Author: Sergey Beryozkin <[email protected]> Authored: Tue Mar 25 10:59:23 2014 +0000 Committer: Sergey Beryozkin <[email protected]> Committed: Tue Mar 25 10:59:23 2014 +0000 ---------------------------------------------------------------------- .../apache/cxf/jaxrs/ext/StreamingResponse.java | 30 ++++++ .../provider/StreamingResponseProvider.java | 104 +++++++++++++++++++ .../jaxrs/websocket/BookServerWebSocket.java | 3 + .../jaxrs/websocket/BookStoreWebSocket.java | 24 +++++ .../JAXRSClientServerWebSocketTest.java | 33 ++++++ .../resources/jaxrs_websocket/WEB-INF/beans.xml | 3 + .../jaxrs_websocket/beans-embedded.xml | 3 + 7 files changed, 200 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/3f1542f0/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/StreamingResponse.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/StreamingResponse.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/StreamingResponse.java new file mode 100644 index 0000000..e16a4ef --- /dev/null +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ext/StreamingResponse.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.ext; + +import java.io.IOException; +import java.io.OutputStream; + +public interface StreamingResponse<T> { + interface Writer<T> { + void write(T data) throws IOException; + OutputStream getEntityStream(); + } + void writeTo(Writer<T> writer) throws IOException; +} http://git-wip-us.apache.org/repos/asf/cxf/blob/3f1542f0/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java new file mode 100644 index 0000000..1154df8 --- /dev/null +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/StreamingResponseProvider.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.provider; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; + +import javax.ws.rs.InternalServerErrorException; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.ext.Providers; + +import org.apache.cxf.jaxrs.ext.StreamingResponse; +import org.apache.cxf.jaxrs.utils.InjectionUtils; + +public class StreamingResponseProvider<T> implements + MessageBodyWriter<StreamingResponse<T>> { + + @Context + private Providers providers; + + @Override + public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType mt) { + return StreamingResponse.class.isAssignableFrom(cls); + } + + @Override + public void writeTo(StreamingResponse<T> p, Class<?> cls, Type t, Annotation[] anns, + MediaType mt, MultivaluedMap<String, Object> headers, OutputStream os) + throws IOException, WebApplicationException { + Class<?> actualCls = InjectionUtils.getActualType(t); + @SuppressWarnings("unchecked") + MessageBodyWriter<T> writer = + (MessageBodyWriter<T>)providers.getMessageBodyWriter(actualCls, actualCls, anns, mt); + if (writer == null) { + throw new InternalServerErrorException(); + } + //TODO: review the possibility of caching the providers + StreamingProviderWriterImpl thewriter = + new StreamingProviderWriterImpl(writer, actualCls, anns, mt, headers, os); + p.writeTo(thewriter); + } + + @Override + public long getSize(StreamingResponse<T> arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4) { + return -1; + } + + private class StreamingProviderWriterImpl implements StreamingResponse.Writer<T> { + private MessageBodyWriter<T> writer; + private Class<?> cls; + private MediaType mt; + private Annotation[] anns; + private MultivaluedMap<String, Object> headers; + private OutputStream os; + + public StreamingProviderWriterImpl(MessageBodyWriter<T> writer, + Class<?> cls, + Annotation[] anns, + MediaType mt, + MultivaluedMap<String, Object> headers, + OutputStream os) { + this.writer = writer; + this.cls = cls; + this.anns = anns; + this.mt = mt; + this.headers = headers; + this.os = os; + } + + @Override + public void write(T data) throws IOException { + writer.writeTo(data, cls, cls, anns, mt, headers, os); + + } + + @Override + public OutputStream getEntityStream() { + return os; + } + + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/3f1542f0/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookServerWebSocket.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookServerWebSocket.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookServerWebSocket.java index 5938f09..33c1b52 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookServerWebSocket.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookServerWebSocket.java @@ -23,6 +23,8 @@ import org.apache.cxf.Bus; import org.apache.cxf.BusFactory; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; +import org.apache.cxf.jaxrs.provider.StreamingResponseProvider; +import org.apache.cxf.systest.jaxrs.Book; import org.apache.cxf.systest.jaxrs.BookStorePerRequest; import org.apache.cxf.testutil.common.AbstractBusTestServerBase; @@ -42,6 +44,7 @@ public class BookServerWebSocket extends AbstractBusTestServerBase { JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); sf.setBus(bus); sf.setResourceClasses(BookStoreWebSocket.class, BookStorePerRequest.class); + sf.setProvider(new StreamingResponseProvider<Book>()); sf.setResourceProvider(BookStoreWebSocket.class, new SingletonResourceProvider(new BookStoreWebSocket(), true)); sf.setAddress("ws://localhost:" + PORT + "/websocket"); http://git-wip-us.apache.org/repos/asf/cxf/blob/3f1542f0/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java index e658ca6..d2c70e1 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/BookStoreWebSocket.java @@ -36,6 +36,7 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; import javax.ws.rs.core.StreamingOutput; +import org.apache.cxf.jaxrs.ext.StreamingResponse; import org.apache.cxf.systest.jaxrs.Book; @Path("/web/bookstore") @@ -99,6 +100,29 @@ public class BookStoreWebSocket { }; } + @GET + @Path("/bookstream") + @Produces("application/json") + public StreamingResponse<Book> getBookStream() { + return new StreamingResponse<Book>() { + public void writeTo(final StreamingResponse.Writer<Book> out) throws IOException { + out.write(new Book("WebSocket1", 1)); + executor.execute(new Runnable() { + public void run() { + try { + for (int i = 2; i <= 5; i++) { + Thread.sleep(500); + out.write(new Book("WebSocket" + i, i)); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + }; + } + } http://git-wip-us.apache.org/repos/asf/cxf/blob/3f1542f0/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java index df0b5aa..1c32f85 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/websocket/JAXRSClientServerWebSocketTest.java @@ -128,6 +128,39 @@ public class JAXRSClientServerWebSocketTest extends AbstractBusClientServerTestB } @Test + public void testGetBookStream() throws Exception { + String address = "ws://localhost:" + getPort() + "/websocket/web/bookstore"; + + WebSocketTestClient wsclient = new WebSocketTestClient(address); + wsclient.connect(); + try { + wsclient.reset(5); + wsclient.sendMessage( + "GET /websocket/web/bookstore/bookstream\r\nAccept: application/json\r\n\r\n".getBytes()); + assertTrue("response expected", wsclient.await(5)); + List<WebSocketTestClient.Response> received = wsclient.getReceivedResponses(); + assertEquals(5, received.size()); + WebSocketTestClient.Response resp = received.get(0); + assertEquals(200, resp.getStatusCode()); + assertEquals("application/json", resp.getContentType()); + String value = resp.getTextEntity(); + assertEquals(value, getBookJson(1)); + for (int i = 2; i <= 5; i++) { + // subsequent data should not carry the headers nor the status. + resp = received.get(i - 1); + assertEquals(0, resp.getStatusCode()); + assertEquals(resp.getTextEntity(), getBookJson(i)); + } + } finally { + wsclient.close(); + } + } + + private String getBookJson(int index) { + return "{\"Book\":{\"id\":" + index + ",\"name\":\"WebSocket" + index + "\"}}"; + } + + @Test public void testBookWithWebSocketAndHTTP() throws Exception { String address = "ws://localhost:" + getPort() + "/websocket/web/bookstore"; http://git-wip-us.apache.org/repos/asf/cxf/blob/3f1542f0/systests/jaxrs/src/test/resources/jaxrs_websocket/WEB-INF/beans.xml ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/resources/jaxrs_websocket/WEB-INF/beans.xml b/systests/jaxrs/src/test/resources/jaxrs_websocket/WEB-INF/beans.xml index 3d79ea0..9f4d006 100644 --- a/systests/jaxrs/src/test/resources/jaxrs_websocket/WEB-INF/beans.xml +++ b/systests/jaxrs/src/test/resources/jaxrs_websocket/WEB-INF/beans.xml @@ -40,6 +40,9 @@ <jaxrs:serviceBeans> <ref bean="serviceBean"/> </jaxrs:serviceBeans> + <jaxrs:providers> + <bean class="org.apache.cxf.jaxrs.provider.StreamingResponseProvider"/> + </jaxrs:providers> </jaxrs:server> <jaxrs:server id="bookserviceHTTP" address="/http"> <jaxrs:serviceBeans> http://git-wip-us.apache.org/repos/asf/cxf/blob/3f1542f0/systests/jaxrs/src/test/resources/jaxrs_websocket/beans-embedded.xml ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/resources/jaxrs_websocket/beans-embedded.xml b/systests/jaxrs/src/test/resources/jaxrs_websocket/beans-embedded.xml index fb4206b..b722ea1 100644 --- a/systests/jaxrs/src/test/resources/jaxrs_websocket/beans-embedded.xml +++ b/systests/jaxrs/src/test/resources/jaxrs_websocket/beans-embedded.xml @@ -40,6 +40,9 @@ <jaxrs:serviceBeans> <ref bean="serviceBean"/> </jaxrs:serviceBeans> + <jaxrs:providers> + <bean class="org.apache.cxf.jaxrs.provider.StreamingResponseProvider"/> + </jaxrs:providers> </jaxrs:server> </beans> <!-- END SNIPPET: beans -->
