This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit 23172db0bf5083bc54290dc315f509e881ea6cbf Author: Claus Ibsen <[email protected]> AuthorDate: Tue Feb 23 10:31:55 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../apache/camel/component/sjms/SjmsEndpoint.java | 8 ++++---- .../sjms/consumer/EndpointMessageListener.java | 20 ++++++++++++++++++-- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java index d8e1bee..c65df4f 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java @@ -318,13 +318,13 @@ public class SjmsEndpoint extends DefaultEndpoint @Override public Consumer createConsumer(Processor processor) throws Exception { - EndpointMessageListener listener = new EndpointMessageListener(this, processor); - configureMessageListener(listener); - MessageListenerContainer container = createMessageListenerContainer(this); + SjmsConsumer consumer = new SjmsConsumer(this, processor, container); + + EndpointMessageListener listener = new EndpointMessageListener(consumer, this, processor); + configureMessageListener(listener); container.setMessageListener(listener); - SjmsConsumer consumer = new SjmsConsumer(this, processor, container); configureConsumer(consumer); return consumer; } diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java index 8c7b9d3..b3d9f11 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java @@ -35,7 +35,9 @@ import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.sjms.SessionCallback; import org.apache.camel.component.sjms.SessionMessageListener; import org.apache.camel.component.sjms.SjmsConstants; +import org.apache.camel.component.sjms.SjmsConsumer; import org.apache.camel.component.sjms.SjmsEndpoint; +import org.apache.camel.component.sjms.SjmsMessage; import org.apache.camel.component.sjms.SjmsTemplate; import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.support.AsyncProcessorConverterHelper; @@ -54,6 +56,7 @@ public class EndpointMessageListener implements SessionMessageListener { private static final Logger LOG = LoggerFactory.getLogger(EndpointMessageListener.class); + private final SjmsConsumer consumer; private final SjmsEndpoint endpoint; private final AsyncProcessor processor; private Object replyToDestination; @@ -63,7 +66,8 @@ public class EndpointMessageListener implements SessionMessageListener { private String eagerPoisonBody; private volatile SjmsTemplate template; - public EndpointMessageListener(SjmsEndpoint endpoint, Processor processor) { + public EndpointMessageListener(SjmsConsumer consumer, SjmsEndpoint endpoint, Processor processor) { + this.consumer = consumer; this.endpoint = endpoint; this.processor = AsyncProcessorConverterHelper.convert(processor); } @@ -207,6 +211,9 @@ public class EndpointMessageListener implements SessionMessageListener { // if we failed processed the exchange from the async callback task, then grab the exception rce = exchange.getException(RuntimeCamelException.class); + // the exchange is now done so release it + consumer.releaseExchange(exchange, false); + } catch (Exception e) { rce = wrapRuntimeCamelException(e); } @@ -234,7 +241,10 @@ public class EndpointMessageListener implements SessionMessageListener { } public Exchange createExchange(Message message, Session session, Object replyDestination) { - Exchange exchange = endpoint.createExchange(message, session); + Exchange exchange = consumer.createExchange(false); + // create a mew SjmsMessage as it cannot be reset for reuse + // TODO: optimize to allow reset + exchange.setIn(new SjmsMessage(exchange, message, session, endpoint.getBinding())); // lets set to an InOut if we have some kind of reply-to destination if (replyDestination != null && !disableReplyTo) { @@ -455,6 +465,12 @@ public class EndpointMessageListener implements SessionMessageListener { } } } + + // if we completed from async processing then we should release the exchange + // the sync processing will release the exchange outside this callback + if (!doneSync) { + consumer.releaseExchange(exchange, false); + } } }
