Repository: cxf Updated Branches: refs/heads/master b5355adf6 -> 08e8316aa
[CXF-6882] Initial JAX-RS NIO Write support, code is based on Andriy Redko's branch code with minor tweaks Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/08e8316a Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/08e8316a Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/08e8316a Branch: refs/heads/master Commit: 08e8316aa5c575d6117d20c4054405e1ea0e2887 Parents: b5355ad Author: Sergey Beryozkin <[email protected]> Authored: Tue Dec 6 11:00:17 2016 +0000 Committer: Sergey Beryozkin <[email protected]> Committed: Tue Dec 6 11:00:17 2016 +0000 ---------------------------------------------------------------------- .../apache/cxf/continuations/Continuation.java | 2 + .../interceptor/OutgoingChainInterceptor.java | 4 + .../apache/cxf/phase/PhaseInterceptorChain.java | 13 +++- .../cxf/jaxrs/impl/ResponseBuilderImpl.java | 10 +-- .../jaxrs/nio/DelegatingNioOutputStream.java | 57 ++++++++++++++ .../cxf/jaxrs/nio/NioMessageBodyWriter.java | 74 ++++++++++++++++++ .../apache/cxf/jaxrs/nio/NioWriteEntity.java | 41 ++++++++++ .../cxf/jaxrs/nio/NioWriteListenerImpl.java | 63 +++++++++++++++ .../jaxrs/provider/ServerProviderFactory.java | 5 +- .../cxf/jaxrs/provider/ProviderFactoryTest.java | 14 ++-- .../continuations/JettyContinuationWrapper.java | 5 ++ .../http/Servlet3ContinuationProvider.java | 26 ++++++- .../jms/continuations/JMSContinuation.java | 5 ++ .../cxf/systest/jaxrs/nio/NioBookStore.java | 69 +++++++++++++++++ .../systest/jaxrs/nio/NioBookStoreServer.java | 81 ++++++++++++++++++++ .../cxf/systest/jaxrs/nio/NioBookStoreTest.java | 72 +++++++++++++++++ .../jaxrs/src/test/resources/files/books.txt | 4 + .../test/resources/jaxrs_nio/WEB-INF/beans.xml | 42 ++++++++++ .../test/resources/jaxrs_nio/WEB-INF/web.xml | 46 +++++++++++ 19 files changed, 617 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/core/src/main/java/org/apache/cxf/continuations/Continuation.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/cxf/continuations/Continuation.java b/core/src/main/java/org/apache/cxf/continuations/Continuation.java index d33140c..b3be6fd 100644 --- a/core/src/main/java/org/apache/cxf/continuations/Continuation.java +++ b/core/src/main/java/org/apache/cxf/continuations/Continuation.java @@ -76,4 +76,6 @@ public interface Continuation { * @param o An arbitrary object to associate with the continuation */ void setObject(Object o); + + boolean isReadyForWrite(); } http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/core/src/main/java/org/apache/cxf/interceptor/OutgoingChainInterceptor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/cxf/interceptor/OutgoingChainInterceptor.java b/core/src/main/java/org/apache/cxf/interceptor/OutgoingChainInterceptor.java index 73c49d2..2382a25 100644 --- a/core/src/main/java/org/apache/cxf/interceptor/OutgoingChainInterceptor.java +++ b/core/src/main/java/org/apache/cxf/interceptor/OutgoingChainInterceptor.java @@ -79,8 +79,12 @@ public class OutgoingChainInterceptor extends AbstractPhaseInterceptor<Message> if (outChain == null) { outChain = OutgoingChainInterceptor.getChain(ex, chainCache); out.setInterceptorChain(outChain); + } else if (outChain.getState() == InterceptorChain.State.PAUSED) { + outChain.resume(); + return; } outChain.doIntercept(out); + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java b/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java index 99811fc..2ed58d1 100644 --- a/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java +++ b/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java @@ -32,6 +32,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.common.util.PropertyUtils; import org.apache.cxf.common.util.StringUtils; import org.apache.cxf.continuations.SuspendedInvocationException; import org.apache.cxf.interceptor.Fault; @@ -312,8 +313,16 @@ public class PhaseInterceptorChain implements InterceptorChain { } } catch (SuspendedInvocationException ex) { - // we need to resume from the same interceptor the exception got originated from - if (iterator.hasPrevious()) { + + // Moving the chain iterator to the previous interceptor is needed + // for the invocation to be resumed from the same interceptor which + // suspended the invocation. + // If "suspend.chain.on.current.interceptor" is set to true then + // the chain will be resumed from the interceptor which follows + // the interceptor which suspended the invocation. + Object suspendProp = message.remove("suspend.chain.on.current.interceptor"); + if ((suspendProp == null || PropertyUtils.isFalse(suspendProp)) + && iterator.hasPrevious()) { iterator.previous(); } pause(); http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java index 29c5c42..78797a8 100644 --- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseBuilderImpl.java @@ -41,6 +41,7 @@ import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.UriInfo; import javax.ws.rs.core.Variant; +import org.apache.cxf.jaxrs.nio.NioWriteEntity; import org.apache.cxf.message.Message; import org.apache.cxf.phase.PhaseInterceptorChain; @@ -318,14 +319,13 @@ public class ResponseBuilderImpl extends ResponseBuilder implements Cloneable { } @Override - public ResponseBuilder entity(NioWriterHandler arg0) { - // TODO: Not Implemented - return this; + public ResponseBuilder entity(NioWriterHandler writerHandler) { + return entity(writerHandler, (NioErrorHandler)null); } @Override - public ResponseBuilder entity(NioWriterHandler arg0, NioErrorHandler arg1) { - // TODO: Not Implemented + public ResponseBuilder entity(NioWriterHandler writerHandler, NioErrorHandler errorHandler) { + entity = new NioWriteEntity(writerHandler, errorHandler); return this; } } http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioOutputStream.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioOutputStream.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioOutputStream.java new file mode 100644 index 0000000..29e05e1 --- /dev/null +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/DelegatingNioOutputStream.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.nio; + +import java.io.IOException; +import java.io.OutputStream; + +import javax.ws.rs.core.NioOutputStream; + +public class DelegatingNioOutputStream extends NioOutputStream { + private final OutputStream out; + + public DelegatingNioOutputStream(final OutputStream out) { + this.out = out; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + + @Override + public void write(byte[] b) throws IOException { + out.write(b); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + out.close(); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioMessageBodyWriter.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioMessageBodyWriter.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioMessageBodyWriter.java new file mode 100644 index 0000000..d4ab508 --- /dev/null +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioMessageBodyWriter.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.nio; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; + +import javax.servlet.WriteListener; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyWriter; +import javax.ws.rs.ext.Provider; + +import org.apache.cxf.continuations.Continuation; +import org.apache.cxf.continuations.ContinuationProvider; +import org.apache.cxf.jaxrs.utils.JAXRSUtils; +import org.apache.cxf.message.Message; + +@Provider +public class NioMessageBodyWriter implements MessageBodyWriter<NioWriteEntity> { + + public NioMessageBodyWriter() { + } + + @Override + public boolean isWriteable(Class<?> cls, Type type, Annotation[] anns, MediaType mt) { + return NioWriteEntity.class.isAssignableFrom(cls); + } + + @Override + public void writeTo(NioWriteEntity entity, Class<?> cls, Type t, Annotation[] anns, + MediaType mt, MultivaluedMap<String, Object> headers, OutputStream os) + throws IOException, WebApplicationException { + Continuation cont = getContinuation(); + NioWriteListenerImpl listener = new NioWriteListenerImpl(cont, entity, os); + Message m = JAXRSUtils.getCurrentMessage(); + m.put(WriteListener.class, listener); + // After this MBW registers the listener, JAXRSOutInterceptor is done, and the + // out chain will need to be resumed from the interceptor which follows it + m.put("suspend.chain.on.current.interceptor", Boolean.TRUE); + cont.suspend(0); + } + + @Override + public long getSize(NioWriteEntity t, Class<?> type, Type genericType, Annotation[] annotations, + MediaType mediaType) { + return -1; + } + private Continuation getContinuation() { + ContinuationProvider provider = + (ContinuationProvider)JAXRSUtils.getCurrentMessage().getExchange() + .getInMessage().get(ContinuationProvider.class.getName()); + return provider.getContinuation(); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteEntity.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteEntity.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteEntity.java new file mode 100644 index 0000000..3835bd6 --- /dev/null +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteEntity.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.nio; + +import javax.ws.rs.core.NioErrorHandler; +import javax.ws.rs.core.NioWriterHandler; + +public final class NioWriteEntity { + private final NioWriterHandler writer; + private final NioErrorHandler error; + + public NioWriteEntity(final NioWriterHandler writer, final NioErrorHandler error) { + this.writer = writer; + this.error = error; + } + + public NioWriterHandler getWriter() { + return writer; + } + + public NioErrorHandler getError() { + return error; + } +} + http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java new file mode 100644 index 0000000..aed2cec --- /dev/null +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/nio/NioWriteListenerImpl.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.nio; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.logging.Logger; + +import javax.servlet.WriteListener; + +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.continuations.Continuation; +import org.apache.cxf.jaxrs.utils.ExceptionUtils; + +public final class NioWriteListenerImpl implements WriteListener { + private static final Logger LOG = LogUtils.getL7dLogger(NioWriteListenerImpl.class); + private Continuation cont; + private final NioWriteEntity entity; + private final DelegatingNioOutputStream out; + + NioWriteListenerImpl(Continuation cont, NioWriteEntity entity, OutputStream out) { + this.cont = cont; + this.entity = entity; + this.out = new DelegatingNioOutputStream(out); + } + + @Override + public void onWritePossible() throws IOException { + while (cont.isReadyForWrite()) { + if (!entity.getWriter().write(out)) { + cont.resume(); + return; + } + } + } + + @Override + public void onError(Throwable t) { + try { + entity.getError().error(t); + } catch (final Throwable ex) { + LOG.warning("NIO WriteListener error: " + ExceptionUtils.getStackTrace(ex)); + } finally { + cont.resume(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/ServerProviderFactory.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/ServerProviderFactory.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/ServerProviderFactory.java index 8354f13..13cae5a 100644 --- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/ServerProviderFactory.java +++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/provider/ServerProviderFactory.java @@ -64,6 +64,7 @@ import org.apache.cxf.jaxrs.model.ClassResourceInfo; import org.apache.cxf.jaxrs.model.FilterProviderInfo; import org.apache.cxf.jaxrs.model.OperationResourceInfo; import org.apache.cxf.jaxrs.model.ProviderInfo; +import org.apache.cxf.jaxrs.nio.NioMessageBodyWriter; import org.apache.cxf.jaxrs.utils.AnnotationUtils; import org.apache.cxf.jaxrs.utils.InjectionUtils; import org.apache.cxf.jaxrs.utils.JAXRSUtils; @@ -119,7 +120,9 @@ public final class ServerProviderFactory extends ProviderFactory { } ServerProviderFactory factory = new ServerProviderFactory(bus); ProviderFactory.initFactory(factory); - factory.setProviders(false, false, new WebApplicationExceptionMapper()); + factory.setProviders(false, false, + new WebApplicationExceptionMapper(), + new NioMessageBodyWriter()); factory.setBusProviders(); return factory; } http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/rt/frontend/jaxrs/src/test/java/org/apache/cxf/jaxrs/provider/ProviderFactoryTest.java ---------------------------------------------------------------------- diff --git a/rt/frontend/jaxrs/src/test/java/org/apache/cxf/jaxrs/provider/ProviderFactoryTest.java b/rt/frontend/jaxrs/src/test/java/org/apache/cxf/jaxrs/provider/ProviderFactoryTest.java index 8da87d5..58a66b8 100644 --- a/rt/frontend/jaxrs/src/test/java/org/apache/cxf/jaxrs/provider/ProviderFactoryTest.java +++ b/rt/frontend/jaxrs/src/test/java/org/apache/cxf/jaxrs/provider/ProviderFactoryTest.java @@ -124,8 +124,8 @@ public class ProviderFactoryTest extends Assert { // writers List<ProviderInfo<MessageBodyWriter<?>>> writers = pf.getMessageWriters(); - assertEquals(9, writers.size()); - Object lastWriter = writers.get(8).getProvider(); + assertEquals(10, writers.size()); + Object lastWriter = writers.get(9).getProvider(); assertTrue(lastWriter instanceof StringTextProvider); //readers List<ProviderInfo<MessageBodyReader<?>>> readers = pf.getMessageReaders(); @@ -154,8 +154,8 @@ public class ProviderFactoryTest extends Assert { // writers List<ProviderInfo<MessageBodyWriter<?>>> writers = pf.getMessageWriters(); - assertEquals(9, writers.size()); - Object lastWriter = writers.get(8).getProvider(); + assertEquals(10, writers.size()); + Object lastWriter = writers.get(9).getProvider(); assertTrue(lastWriter instanceof StringTextProvider); //readers List<ProviderInfo<MessageBodyReader<?>>> readers = pf.getMessageReaders(); @@ -188,7 +188,7 @@ public class ProviderFactoryTest extends Assert { // writers List<ProviderInfo<MessageBodyWriter<?>>> writers = pf.getMessageWriters(); - assertEquals(9, writers.size()); + assertEquals(10, writers.size()); Object lastWriter = writers.get(8).getProvider(); assertFalse(lastWriter instanceof StringTextProvider); //readers @@ -221,8 +221,8 @@ public class ProviderFactoryTest extends Assert { // writers List<ProviderInfo<MessageBodyWriter<?>>> writers = pf.getMessageWriters(); - assertEquals(9, writers.size()); - Object lastWriter = writers.get(8).getProvider(); + assertEquals(10, writers.size()); + Object lastWriter = writers.get(9).getProvider(); assertTrue(lastWriter instanceof StringTextProvider); //readers List<ProviderInfo<MessageBodyReader<?>>> readers = pf.getMessageReaders(); http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java ---------------------------------------------------------------------- diff --git a/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java b/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java index 3c83170..f3c3d96 100644 --- a/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java +++ b/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java @@ -138,5 +138,10 @@ public class JettyContinuationWrapper implements Continuation, ContinuationListe pendingTimeout = 0; isResumed = true; } + + @Override + public boolean isReadyForWrite() { + return true; + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java ---------------------------------------------------------------------- diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java index 51be32a..a3a6142 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java @@ -24,6 +24,8 @@ import java.io.IOException; import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; +import javax.servlet.ServletOutputStream; +import javax.servlet.WriteListener; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -32,6 +34,7 @@ import org.apache.cxf.continuations.Continuation; import org.apache.cxf.continuations.ContinuationCallback; import org.apache.cxf.continuations.ContinuationProvider; import org.apache.cxf.message.Message; +import org.apache.cxf.phase.PhaseInterceptorChain; /** * @@ -116,7 +119,16 @@ public class Servlet3ContinuationProvider implements ContinuationProvider { isResumed = false; context.setTimeout(timeout); - inMessage.getExchange().getInMessage().getInterceptorChain().suspend(); + + Message currentMessage = PhaseInterceptorChain.getCurrentMessage(); + if (currentMessage.get(WriteListener.class) != null) { + // CXF Continuation WriteListener will likely need to be introduced + // for NIO supported with non-Servlet specific mechanisms + getOutputStream().setWriteListener(currentMessage.get(WriteListener.class)); + currentMessage.getInterceptorChain().suspend(); + } else { + inMessage.getExchange().getInMessage().getInterceptorChain().suspend(); + } return true; } @@ -216,6 +228,18 @@ public class Servlet3ContinuationProvider implements ContinuationProvider { return false; } } + + @Override + public boolean isReadyForWrite() { + return getOutputStream().isReady(); + } + private ServletOutputStream getOutputStream() { + try { + return resp.getOutputStream(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } } } http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java index b56ee84..243d4ae 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java @@ -157,5 +157,10 @@ public class JMSContinuation implements Continuation { isCanceled = true; } + @Override + public boolean isReadyForWrite() { + return true; + } + } http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java new file mode 100644 index 0000000..bd885f5 --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStore.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.systest.jaxrs.nio; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.cxf.helpers.IOUtils; + +@Path("/bookstore") +public class NioBookStore { + @GET + @Produces(MediaType.TEXT_PLAIN) + public Response readBooks(@QueryParam("path") String path) throws IOException { + final ByteArrayInputStream in = new ByteArrayInputStream( + IOUtils.readBytesFromStream(getClass().getResourceAsStream("/files/books.txt"))); + final byte[] buffer = new byte[4096]; + + return Response.ok().entity( + out -> { + try { + final int n = in.read(buffer); + + if (n >= 0) { + out.write(buffer, 0, n); + return true; + } + + try { + in.close(); + } catch (IOException ex) { + /* do nothing */ + } + + return false; + } catch (IOException ex) { + throw new WebApplicationException(ex); + } + }, + throwable -> { + throw throwable; + } + ).build(); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreServer.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreServer.java new file mode 100644 index 0000000..5e00e50 --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreServer.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.nio; + +import java.net.URISyntaxException; + +import org.apache.cxf.testutil.common.AbstractBusTestServerBase; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.webapp.WebAppContext; + + +public class NioBookStoreServer extends AbstractBusTestServerBase { + static final String PORT = allocatePort(NioBookStoreServer.class); + + private org.eclipse.jetty.server.Server server; + + protected void run() { + server = new org.eclipse.jetty.server.Server(Integer.parseInt(PORT)); + + WebAppContext webappcontext = new WebAppContext(); + String contextPath = null; + try { + contextPath = getClass().getResource("/jaxrs_nio").toURI().getPath(); + } catch (URISyntaxException e1) { + e1.printStackTrace(); + } + webappcontext.setContextPath("/"); + + webappcontext.setWar(contextPath); + + HandlerCollection handlers = new HandlerCollection(); + handlers.setHandlers(new Handler[] {webappcontext, new DefaultHandler()}); + + server.setHandler(handlers); + try { + server.start(); + + } catch (Exception e) { + e.printStackTrace(); + } + } + public void tearDown() throws Exception { + super.tearDown(); + if (server != null) { + server.stop(); + server.destroy(); + server = null; + } + } + + public static void main(String args[]) { + try { + NioBookStoreServer s = new NioBookStoreServer(); + s.start(); + } catch (Exception ex) { + ex.printStackTrace(); + System.exit(-1); + } finally { + System.out.println("done!"); + } + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreTest.java new file mode 100644 index 0000000..19b0830 --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/nio/NioBookStoreTest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.nio; + +import java.util.Arrays; +import java.util.List; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + +import org.apache.cxf.helpers.IOUtils; +import org.apache.cxf.jaxrs.client.WebClient; +import org.apache.cxf.jaxrs.model.AbstractResourceInfo; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class NioBookStoreTest extends AbstractBusClientServerTestBase { + @BeforeClass + public static void startServers() throws Exception { + AbstractResourceInfo.clearAllMaps(); + assertTrue("server did not launch correctly", launchServer(NioBookStoreServer.class, true)); + createStaticBus(); + } + + @Test + public void testGetAllBooks() throws Exception { + final Response response = createWebClient("/bookstore", MediaType.TEXT_PLAIN).get(); + + try { + assertEquals(200, response.getStatus()); + + assertThat(response.readEntity(String.class), equalTo(IOUtils.readStringFromStream( + getClass().getResourceAsStream("/files/books.txt")))); + } finally { + response.close(); + } + } + + protected WebClient createWebClient(final String url, final String mediaType) { + final List< ? > providers = Arrays.asList(new JacksonJsonProvider()); + + final WebClient wc = WebClient + .create("http://localhost:" + NioBookStoreServer.PORT + url, providers) + .accept(mediaType); + + WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000L); + return wc; + } +} + http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/systests/jaxrs/src/test/resources/files/books.txt ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/resources/files/books.txt b/systests/jaxrs/src/test/resources/files/books.txt new file mode 100644 index 0000000..6f23352 --- /dev/null +++ b/systests/jaxrs/src/test/resources/files/books.txt @@ -0,0 +1,4 @@ +Molecular Breeding and Nutritional Aspects of Buckwheat +{Progress in Heterocyclic Chemistry, Volume 28} +Pharmacology and Therapeutics for Dentistry +Plotkin's Vaccines http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/systests/jaxrs/src/test/resources/jaxrs_nio/WEB-INF/beans.xml ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/resources/jaxrs_nio/WEB-INF/beans.xml b/systests/jaxrs/src/test/resources/jaxrs_nio/WEB-INF/beans.xml new file mode 100644 index 0000000..5d24b1f --- /dev/null +++ b/systests/jaxrs/src/test/resources/jaxrs_nio/WEB-INF/beans.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:jaxrs="http://cxf.apache.org/jaxrs" + xmlns:core="http://cxf.apache.org/core" + xsi:schemaLocation=" +http://www.springframework.org/schema/beans +http://www.springframework.org/schema/beans/spring-beans-4.3.xsd +http://cxf.apache.org/core +http://cxf.apache.org/schemas/core.xsd +http://cxf.apache.org/jaxrs +http://cxf.apache.org/schemas/jaxrs.xsd"> + <import resource="classpath:/META-INF/cxf/cxf.xml"/> + + <jaxrs:server id="bookservice" + address="/"> + <jaxrs:serviceBeans> + <ref bean="bookstore"/> + </jaxrs:serviceBeans> + </jaxrs:server> + + <bean id="bookstore" class="org.apache.cxf.systest.jaxrs.nio.NioBookStore"/> + +</beans> http://git-wip-us.apache.org/repos/asf/cxf/blob/08e8316a/systests/jaxrs/src/test/resources/jaxrs_nio/WEB-INF/web.xml ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/resources/jaxrs_nio/WEB-INF/web.xml b/systests/jaxrs/src/test/resources/jaxrs_nio/WEB-INF/web.xml new file mode 100644 index 0000000..3e984d3 --- /dev/null +++ b/systests/jaxrs/src/test/resources/jaxrs_nio/WEB-INF/web.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="ISO-8859-1"?> +<!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com/dtd/web-app_2_3.dtd"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<!-- START SNIPPET: webxml --> +<web-app> + <context-param> + <param-name>contextConfigLocation</param-name> + <param-value>WEB-INF/beans.xml</param-value> + </context-param> + <listener> + <listener-class> + org.springframework.web.context.ContextLoaderListener + </listener-class> + </listener> + <servlet> + <servlet-name>CXFServlet</servlet-name> + <display-name>CXF Servlet</display-name> + <servlet-class> + org.apache.cxf.transport.servlet.CXFServlet + </servlet-class> + <load-on-startup>1</load-on-startup> + <async-supported>true</async-supported> + </servlet> + <servlet-mapping> + <servlet-name>CXFServlet</servlet-name> + <url-pattern>/*</url-pattern> + </servlet-mapping> +</web-app> +<!-- END SNIPPET: webxml -->
