Repository: camel Updated Branches: refs/heads/camel-2.17.x 19f619eee -> e822ae58c refs/heads/master f39b83eeb -> 2e9bce8a0
CAMEL-10171 memory leak when continuation expires setobject done earlier and exception set on camelExchange. isExpired method call thru Continuation interface is cancelled and below issues became invalid as discussed in CXF-7011 https://issues.apache.org/jira/browse/CXF-7002 https://issues.apache.org/jira/browse/CXF-7011 instead such block means the same; https://issues.apache.org/jira/browse/CXF-7011?focusedCommentId=15422696&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15422696 Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2e9bce8a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2e9bce8a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2e9bce8a Branch: refs/heads/master Commit: 2e9bce8a064b6694cb7985955582baca90698b0c Parents: f39b83e Author: önder sezgin <ondersez...@gmail.com> Authored: Tue Aug 16 22:22:46 2016 +0300 Committer: önder sezgin <ondersez...@gmail.com> Committed: Tue Aug 16 22:22:46 2016 +0300 ---------------------------------------------------------------------- .../org/apache/camel/component/cxf/CxfConsumer.java | 14 +++++++++++++- .../camel/component/cxf/jaxrs/CxfRsInvoker.java | 14 +++++++++++++- 2 files changed, 26 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2e9bce8a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java index 2271e2f..e16127b 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java @@ -26,6 +26,7 @@ import org.w3c.dom.Element; import org.apache.camel.AsyncCallback; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.Processor; import org.apache.camel.component.cxf.common.message.CxfConstants; import org.apache.camel.impl.DefaultConsumer; @@ -148,6 +149,8 @@ public class CxfConsumer extends DefaultConsumer { // The continuation could be called before the suspend is called continuation.suspend(cxfEndpoint.getContinuationTimeout()); + + continuation.setObject(camelExchange); // use the asynchronous API to process the exchange getAsyncProcessor().process(camelExchange, new AsyncCallback() { @@ -156,7 +159,6 @@ public class CxfConsumer extends DefaultConsumer { synchronized (continuation) { LOG.trace("Resuming continuation of exchangeId: {}", camelExchange.getExchangeId()); // resume processing after both, sync and async callbacks - continuation.setObject(camelExchange); continuation.resume(); } } @@ -170,6 +172,16 @@ public class CxfConsumer extends DefaultConsumer { CxfConsumer.this.doneUoW(camelExchange); } + } else if (!continuation.isResumed() && !continuation.isPending()) { + org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject(); + try { + if (!continuation.isPending()) { + camelExchange.setException(new ExchangeTimedOutException(camelExchange, cxfEndpoint.getContinuationTimeout())); + } + setResponseBack(cxfExchange, camelExchange); + } finally { + CxfConsumer.this.doneUoW(camelExchange); + } } } return null; http://git-wip-us.apache.org/repos/asf/camel/blob/2e9bce8a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java ---------------------------------------------------------------------- diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java index fb999f0..01563d3 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java @@ -26,6 +26,7 @@ import javax.ws.rs.core.UriInfo; import org.apache.camel.AsyncCallback; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.RuntimeCamelException; import org.apache.cxf.continuations.Continuation; import org.apache.cxf.continuations.ContinuationProvider; @@ -90,13 +91,13 @@ public class CxfRsInvoker extends JAXRSInvoker { // The continuation could be called before the suspend is called continuation.suspend(endpoint.getContinuationTimeout()); cxfExchange.put(SUSPENED, Boolean.TRUE); + continuation.setObject(camelExchange); cxfRsConsumer.getAsyncProcessor().process(camelExchange, new AsyncCallback() { public void done(boolean doneSync) { // make sure the continuation resume will not be called before the suspend method in other thread synchronized (continuation) { LOG.trace("Resuming continuation of exchangeId: {}", camelExchange.getExchangeId()); // resume processing after both, sync and async callbacks - continuation.setObject(camelExchange); continuation.resume(); } } @@ -111,6 +112,17 @@ public class CxfRsInvoker extends JAXRSInvoker { } finally { cxfRsConsumer.doneUoW(camelExchange); } + } else { + if (!continuation.isPending()) { + cxfExchange.put(SUSPENED, Boolean.FALSE); + org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject(); + camelExchange.setException(new ExchangeTimedOutException(camelExchange, endpoint.getContinuationTimeout())); + try { + return returnResponse(cxfExchange, camelExchange); + } finally { + cxfRsConsumer.doneUoW(camelExchange); + } + } } } return null;