Repository: cxf Updated Branches: refs/heads/master 70317dafc -> 58b379bdf
[CXF-6882] Adding CXF UseNioWrite extension - supported for JAXRS only at the moment Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/59436ead Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/59436ead Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/59436ead Branch: refs/heads/master Commit: 59436eadca147ba937a24a4f4fc6ced93cc0bc7f Parents: 70317da Author: Sergey Beryozkin <[email protected]> Authored: Wed Dec 7 15:59:14 2016 +0000 Committer: Sergey Beryozkin <[email protected]> Committed: Wed Dec 7 15:59:14 2016 +0000 ---------------------------------------------------------------------- .../cxf/jaxrs/nio/NioWriteListenerImpl.java | 7 +- .../cxf/jaxrs/provider/BinaryDataProvider.java | 81 +++++++++++++++++++- .../cxf/systest/jaxrs/nio/NioBookStore.java | 13 +++- .../cxf/systest/jaxrs/nio/NioBookStoreTest.java | 14 ++++ 4 files changed, 108 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/59436ead/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java index aed2cec..d35b4dd 100644 --- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java @@ -34,7 +34,7 @@ public final class NioWriteListenerImpl implements WriteListener { private final NioWriteEntity entity; private final DelegatingNioOutputStream out; - NioWriteListenerImpl(Continuation cont, NioWriteEntity entity, OutputStream out) { + public NioWriteListenerImpl(Continuation cont, NioWriteEntity entity, OutputStream out) { this.cont = cont; this.entity = entity; this.out = new DelegatingNioOutputStream(out); @@ -44,6 +44,11 @@ public final class NioWriteListenerImpl implements WriteListener { public void onWritePossible() throws IOException { while (cont.isReadyForWrite()) { if (!entity.getWriter().write(out)) { + // REVISIT: + // Immediately closing the async context with cont.resume() works better + // at the moment - with cont.resume() Jetty throws NPE in its internal code + // which is quite possibly a Jetty bug. + // Do we really need to complete the out chain after the response has been written out ? cont.resume(); return; } http://git-wip-us.apache.org/repos/asf/cxf/blob/59436ead/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java index 980b076..f3e9353 100644 --- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/BinaryDataProvider.java @@ -38,19 +38,31 @@ import java.security.DigestInputStream; import java.util.UUID; import java.util.logging.Logger; +import javax.servlet.WriteListener; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.NioOutputStream; +import javax.ws.rs.core.NioWriterHandler; import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.ext.MessageBodyReader; import javax.ws.rs.ext.MessageBodyWriter; +import org.apache.cxf.annotations.UseNioWrite; import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.common.util.MessageDigestInputStream; +import org.apache.cxf.continuations.Continuation; +import org.apache.cxf.continuations.ContinuationProvider; import org.apache.cxf.helpers.FileUtils; import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.jaxrs.impl.HttpHeadersImpl; +import org.apache.cxf.jaxrs.nio.DelegatingNioOutputStream; +import org.apache.cxf.jaxrs.nio.NioWriteEntity; +import org.apache.cxf.jaxrs.nio.NioWriteListenerImpl; +import org.apache.cxf.jaxrs.utils.AnnotationUtils; import org.apache.cxf.jaxrs.utils.ExceptionUtils; +import org.apache.cxf.jaxrs.utils.JAXRSUtils; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageUtils; import org.apache.cxf.phase.PhaseInterceptorChain; @@ -151,12 +163,12 @@ public class BinaryDataProvider<T> extends AbstractConfigurableProvider throws IOException { if (InputStream.class.isAssignableFrom(o.getClass())) { - copyInputToOutput((InputStream)o, os, headers); + copyInputToOutput((InputStream)o, os, annotations, headers); } else if (File.class.isAssignableFrom(o.getClass())) { copyInputToOutput(new BufferedInputStream( - new FileInputStream((File)o)), os, headers); + new FileInputStream((File)o)), os, annotations, headers); } else if (byte[].class.isAssignableFrom(o.getClass())) { - copyInputToOutput(new ByteArrayInputStream((byte[])o), os, headers); + copyInputToOutput(new ByteArrayInputStream((byte[])o), os, annotations, headers); } else if (Reader.class.isAssignableFrom(o.getClass())) { try { Writer writer = new OutputStreamWriter(os, getEncoding(type)); @@ -185,19 +197,48 @@ public class BinaryDataProvider<T> extends AbstractConfigurableProvider } protected void copyInputToOutput(InputStream is, OutputStream os, - MultivaluedMap<String, Object> outHeaders) throws IOException { + Annotation[] anns, MultivaluedMap<String, Object> outHeaders) throws IOException { if (isRangeSupported()) { Message inMessage = PhaseInterceptorChain.getCurrentMessage().getExchange().getInMessage(); handleRangeRequest(is, os, new HttpHeadersImpl(inMessage), outHeaders); } else { + boolean nioWrite = AnnotationUtils.getAnnotation(anns, UseNioWrite.class) != null; + if (nioWrite) { + ContinuationProvider provider = getContinuationProvider(); + if (provider != null) { + copyUsingNio(is, os, provider.getContinuation()); + } + return; + } if (closeResponseInputStream) { IOUtils.copyAndCloseInput(is, os, bufferSize); } else { IOUtils.copy(is, os, bufferSize); } + } } + protected void copyUsingNio(InputStream is, OutputStream os, Continuation cont) { + NioWriteListenerImpl listener = + new NioWriteListenerImpl(cont, + new NioWriteEntity(getNioHandler(is), null), + new DelegatingNioOutputStream(os)); + Message m = JAXRSUtils.getCurrentMessage(); + m.put(WriteListener.class, listener); + // After this MBW registers the listener, JAXRSOutInterceptor is done, and the + // out chain will need to be resumed from the interceptor which follows it + m.put("suspend.chain.on.current.interceptor", Boolean.TRUE); + cont.suspend(0); + } + + private ContinuationProvider getContinuationProvider() { + return (ContinuationProvider)JAXRSUtils.getCurrentMessage().getExchange() + .getInMessage().get(ContinuationProvider.class.getName()); + } + + + protected void handleRangeRequest(InputStream is, OutputStream os, HttpHeaders inHeaders, @@ -231,4 +272,36 @@ public class BinaryDataProvider<T> extends AbstractConfigurableProvider public void setBufferSize(int bufferSize) { this.bufferSize = bufferSize; } + + protected NioWriterHandler getNioHandler(final InputStream in) { + + return new NioWriterHandler() { + final byte[] buffer = new byte[bufferSize]; + @Override + public boolean write(NioOutputStream out) { + try { + final int n = in.read(buffer); + + if (n >= 0) { + out.write(buffer, 0, n); + return true; + } + if (closeResponseInputStream) { + try { + in.close(); + } catch (IOException ex) { + /* do nothing */ + } + } + + return false; + } catch (IOException ex) { + throw new WebApplicationException(ex); + } + + } + }; + + + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/59436ead/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java index bd885f5..ce2507a 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java @@ -20,22 +20,23 @@ package org.apache.cxf.systest.jaxrs.nio; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.apache.cxf.annotations.UseNioWrite; import org.apache.cxf.helpers.IOUtils; @Path("/bookstore") public class NioBookStore { @GET @Produces(MediaType.TEXT_PLAIN) - public Response readBooks(@QueryParam("path") String path) throws IOException { + public Response readBooks() throws IOException { final ByteArrayInputStream in = new ByteArrayInputStream( IOUtils.readBytesFromStream(getClass().getResourceAsStream("/files/books.txt"))); final byte[] buffer = new byte[4096]; @@ -66,4 +67,12 @@ public class NioBookStore { } ).build(); } + + @GET + @Produces(MediaType.TEXT_PLAIN) + @Path("/is") + @UseNioWrite + public InputStream readBooksFromInputStream() throws IOException { + return getClass().getResourceAsStream("/files/books.txt"); + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/59436ead/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreTest.java index 19b0830..c1743e3 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreTest.java @@ -58,6 +58,20 @@ public class NioBookStoreTest extends AbstractBusClientServerTestBase { } } + @Test + public void testGetAllBooksIs() throws Exception { + final Response response = createWebClient("/bookstore/is", MediaType.TEXT_PLAIN).get(); + + try { + assertEquals(200, response.getStatus()); + + assertThat(response.readEntity(String.class), equalTo(IOUtils.readStringFromStream( + getClass().getResourceAsStream("/files/books.txt")))); + } finally { + response.close(); + } + } + protected WebClient createWebClient(final String url, final String mediaType) { final List< ? > providers = Arrays.asList(new JacksonJsonProvider());
