This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit f1b901072b1e4211dc9b905130416bdf6dc09cb2 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Feb 23 14:30:44 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../component/jms/EndpointMessageListener.java | 28 +++++++- .../apache/camel/component/jms/JmsConsumer.java | 2 +- .../org/apache/camel/component/jms/JmsMessage.java | 8 +++ .../component/jms/JmsInOnlyPooledExchangeTest.java | 79 ++++++++++++++++++++++ 4 files changed, 113 insertions(+), 4 deletions(-) diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java index 05225c2..872b2af 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java @@ -26,6 +26,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.RollbackExchangeException; import org.apache.camel.RuntimeCamelException; @@ -45,6 +46,7 @@ import static org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException; */ public class EndpointMessageListener implements SessionAwareMessageListener { private static final Logger LOG = LoggerFactory.getLogger(EndpointMessageListener.class); + private final JmsConsumer consumer; private final JmsEndpoint endpoint; private final AsyncProcessor processor; private JmsBinding binding; @@ -55,7 +57,8 @@ public class EndpointMessageListener implements SessionAwareMessageListener { private boolean disableReplyTo; private boolean async; - public EndpointMessageListener(JmsEndpoint endpoint, Processor processor) { + public EndpointMessageListener(JmsConsumer consumer, JmsEndpoint endpoint, Processor processor) { + this.consumer = consumer; this.endpoint = endpoint; this.processor = AsyncProcessorConverterHelper.convert(processor); } @@ -147,6 +150,9 @@ public class EndpointMessageListener implements SessionAwareMessageListener { // if we failed processed the exchange from the async callback task, then grab the exception rce = exchange.getException(RuntimeCamelException.class); + // the exchange is now done so release it + consumer.releaseExchange(exchange, false); + } catch (Exception e) { rce = wrapRuntimeCamelException(e); } @@ -250,14 +256,30 @@ public class EndpointMessageListener implements SessionAwareMessageListener { } } } + + // if we completed from async processing then we should release the exchange + // the sync processing will release the exchange outside this callback + if (!doneSync) { + consumer.releaseExchange(exchange, false); + } } } public Exchange createExchange(Message message, Session session, Object replyDestination) { - Exchange exchange = endpoint.createExchange(); + Exchange exchange = consumer.createExchange(false); JmsBinding binding = getBinding(); exchange.setProperty(Exchange.BINDING, binding); - exchange.setIn(new JmsMessage(exchange, message, session, binding)); + + // optimize: either create a new JmsMessage or reuse existing if exists + JmsMessage msg = exchange.adapt(ExtendedExchange.class).getInOrNull(JmsMessage.class); + if (msg == null) { + msg = new JmsMessage(exchange, message, session, binding); + exchange.setIn(msg); + } else { + msg.setJmsMessage(message); + msg.setJmsSession(session); + msg.setBinding(binding); + } // lets set to an InOut if we have some kind of reply-to destination if (replyDestination != null && !disableReplyTo) { diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java index 3fafd18..98fce1d 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java @@ -75,7 +75,7 @@ public class JmsConsumer extends DefaultConsumer implements Suspendable { } protected void createMessageListener(JmsEndpoint endpoint, Processor processor) { - messageListener = new EndpointMessageListener(endpoint, processor); + messageListener = new EndpointMessageListener(this, endpoint, processor); getEndpoint().getConfiguration().configureMessageListener(messageListener); messageListener.setBinding(endpoint.getBinding()); messageListener.setAsync(endpoint.getConfiguration().isAsyncConsumer()); diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java index b3efa26..3c8925a 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java @@ -51,6 +51,14 @@ public class JmsMessage extends DefaultMessage { } @Override + public void reset() { + super.reset(); + jmsMessage = null; + jmsSession = null; + binding = null; + } + + @Override public String toString() { // do not print jmsMessage as there could be sensitive details if (jmsMessage != null) { diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyPooledExchangeTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyPooledExchangeTest.java new file mode 100644 index 0000000..6d19057 --- /dev/null +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyPooledExchangeTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import javax.jms.ConnectionFactory; + +import org.apache.camel.CamelContext; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.engine.PooledExchangeFactory; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +public class JmsInOnlyPooledExchangeTest extends CamelTestSupport { + + private static final String JMS_QUEUE_NAME = "activemq:queue:foo"; + private static final String MOCK_RESULT = "mock:result"; + + @Test + public void testSynchronous() throws Exception { + final String expectedBody = "Hello World"; + MockEndpoint mock = getMockEndpoint(MOCK_RESULT); + mock.expectedMessageCount(1); + mock.expectedBodiesReceived(expectedBody); + + template.sendBody(JMS_QUEUE_NAME, expectedBody); + + mock.assertIsSatisfied(); + } + + @Test + public void testTwoSynchronous() throws Exception { + MockEndpoint mock = getMockEndpoint(MOCK_RESULT); + mock.expectedBodiesReceived("Hello World", "Bye World"); + + template.sendBody(JMS_QUEUE_NAME, "Hello World"); + template.sendBody(JMS_QUEUE_NAME, "Bye World"); + + mock.assertIsSatisfied(); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + camelContext.adapt(ExtendedCamelContext.class).setExchangeFactory(new PooledExchangeFactory()); + + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from(JMS_QUEUE_NAME) + .to(MOCK_RESULT); + } + }; + } +}