Don't cache the input stream unless we have gotten through the service invoke phase. If there is an exception or similar on input, there is no need to cache it, just discard what we can.
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/643b1bc7 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/643b1bc7 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/643b1bc7 Branch: refs/heads/2.7.x-fixes Commit: 643b1bc7320ca90c3e078e50509f9a30a0ab45be Parents: d2578fe Author: Daniel Kulp <[email protected]> Authored: Tue Mar 25 13:16:04 2014 -0400 Committer: Daniel Kulp <[email protected]> Committed: Tue Mar 25 13:32:13 2014 -0400 ---------------------------------------------------------------------- .../interceptor/OutgoingChainInterceptor.java | 4 +++ .../transport/http/AbstractHTTPDestination.java | 36 +++++++++++++------- 2 files changed, 28 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/643b1bc7/api/src/main/java/org/apache/cxf/interceptor/OutgoingChainInterceptor.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/cxf/interceptor/OutgoingChainInterceptor.java b/api/src/main/java/org/apache/cxf/interceptor/OutgoingChainInterceptor.java index 831230e..5bd5802 100644 --- a/api/src/main/java/org/apache/cxf/interceptor/OutgoingChainInterceptor.java +++ b/api/src/main/java/org/apache/cxf/interceptor/OutgoingChainInterceptor.java @@ -57,6 +57,10 @@ public class OutgoingChainInterceptor extends AbstractPhaseInterceptor<Message> public void handleMessage(Message message) { Exchange ex = message.getExchange(); BindingOperationInfo binding = ex.get(BindingOperationInfo.class); + //if we get this far, we're going to be outputting some valid content, but we COULD + //also be "echoing" some of the content from the input. Thus, we need to + //mark it as requiring the input to be cached. + message.put("cxf.io.cacheinput", Boolean.TRUE); if (null != binding && null != binding.getOperationInfo() && binding.getOperationInfo().isOneWay()) { closeInput(message); return; http://git-wip-us.apache.org/repos/asf/cxf/blob/643b1bc7/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java index 68b7ae5..19510d9 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/AbstractHTTPDestination.java @@ -523,21 +523,33 @@ public abstract class AbstractHTTPDestination if (inMessage == null) { return; } - Collection<Attachment> atts = inMessage.getAttachments(); - if (atts != null) { - for (Attachment a : atts) { - if (a.getDataHandler().getDataSource() instanceof AttachmentDataSource) { - try { - ((AttachmentDataSource)a.getDataHandler().getDataSource()).cache(inMessage); - } catch (IOException e) { - throw new Fault(e); + Object o = inMessage.get("cxf.io.cacheinput"); + DelegatingInputStream in = inMessage.getContent(DelegatingInputStream.class); + if (MessageUtils.isTrue(o)) { + Collection<Attachment> atts = inMessage.getAttachments(); + if (atts != null) { + for (Attachment a : atts) { + if (a.getDataHandler().getDataSource() instanceof AttachmentDataSource) { + try { + ((AttachmentDataSource)a.getDataHandler().getDataSource()).cache(inMessage); + } catch (IOException e) { + throw new Fault(e); + } } } } - } - DelegatingInputStream in = inMessage.getContent(DelegatingInputStream.class); - if (in != null) { - in.cacheInput(); + if (in != null) { + in.cacheInput(); + } + } else if (in != null) { + //We don't need to cache it, but we may need to consume it in order for the client + // to be able to receive a response. (could be blocked sending) + //However, also don't want to consume indefinitely. We'll limit to 16M. + try { + IOUtils.consume(in, 16 * 1024 * 1024); + } catch (IOException ioe) { + //ignore + } } }
