This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0245ed7f39b95fe5bbc5723b580688c5e2589501 Author: Claus Ibsen <[email protected]> AuthorDate: Tue Feb 23 10:46:16 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../apache/camel/component/sjms/SjmsMessage.java | 8 ++++++++ .../sjms/consumer/EndpointMessageListener.java | 15 +++++++++++--- .../sjms/consumer/InOnlyConsumerQueueTest.java | 11 +++++++++++ ...ueueTest.java => InOnlyPooledExchangeTest.java} | 23 +++++++++++++++++++++- .../java/org/apache/camel/ExtendedExchange.java | 8 ++++++++ .../org/apache/camel/support/AbstractExchange.java | 12 +++++++++++ 6 files changed, 73 insertions(+), 4 deletions(-) diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java index 35f066e..2f76d12 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java @@ -53,6 +53,14 @@ public class SjmsMessage extends DefaultMessage { } @Override + public void reset() { + super.reset(); + jmsMessage = null; + jmsSession = null; + binding = null; + } + + @Override public String toString() { // do not print jmsMessage as there could be sensitive details if (jmsMessage != null) { diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java index b3d9f11..b2c9515 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java @@ -29,6 +29,7 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.RollbackExchangeException; import org.apache.camel.RuntimeCamelException; @@ -242,9 +243,17 @@ public class EndpointMessageListener implements SessionMessageListener { public Exchange createExchange(Message message, Session session, Object replyDestination) { Exchange exchange = consumer.createExchange(false); - // create a mew SjmsMessage as it cannot be reset for reuse - // TODO: optimize to allow reset - exchange.setIn(new SjmsMessage(exchange, message, session, endpoint.getBinding())); + + // optimize: either create a new SjmsMessage or reuse existing if exists + SjmsMessage msg = exchange.adapt(ExtendedExchange.class).getInOrNull(SjmsMessage.class); + if (msg == null) { + msg = new SjmsMessage(exchange, message, session, endpoint.getBinding()); + exchange.setIn(msg); + } else { + msg.setJmsMessage(message); + msg.setJmsSession(session); + msg.setBinding(endpoint.getBinding()); + } // lets set to an InOut if we have some kind of reply-to destination if (replyDestination != null && !disableReplyTo) { diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueTest.java index b55fec2..af55fc2 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueTest.java @@ -38,6 +38,17 @@ public class InOnlyConsumerQueueTest extends JmsTestSupport { mock.assertIsSatisfied(); } + @Test + public void testTwoSynchronous() throws Exception { + MockEndpoint mock = getMockEndpoint(MOCK_RESULT); + mock.expectedBodiesReceived("Hello World", "Bye World"); + + template.sendBody(SJMS_QUEUE_NAME, "Hello World"); + template.sendBody(SJMS_QUEUE_NAME, "Bye World"); + + mock.assertIsSatisfied(); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyPooledExchangeTest.java similarity index 69% copy from components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueTest.java copy to components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyPooledExchangeTest.java index b55fec2..9ddbf05 100644 --- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueTest.java +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyPooledExchangeTest.java @@ -16,16 +16,26 @@ */ package org.apache.camel.component.sjms.consumer; +import org.apache.camel.CamelContext; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.sjms.support.JmsTestSupport; +import org.apache.camel.impl.engine.PooledExchangeFactory; import org.junit.jupiter.api.Test; -public class InOnlyConsumerQueueTest extends JmsTestSupport { +public class InOnlyPooledExchangeTest extends JmsTestSupport { private static final String SJMS_QUEUE_NAME = "sjms:queue:in.only.consumer.queue"; private static final String MOCK_RESULT = "mock:result"; + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PooledExchangeFactory()); + return context; + } + @Test public void testSynchronous() throws Exception { final String expectedBody = "Hello World"; @@ -38,6 +48,17 @@ public class InOnlyConsumerQueueTest extends JmsTestSupport { mock.assertIsSatisfied(); } + @Test + public void testTwoSynchronous() throws Exception { + MockEndpoint mock = getMockEndpoint(MOCK_RESULT); + mock.expectedBodiesReceived("Hello World", "Bye World"); + + template.sendBody(SJMS_QUEUE_NAME, "Hello World"); + template.sendBody(SJMS_QUEUE_NAME, "Bye World"); + + mock.assertIsSatisfied(); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java index bb0b523..7974ca1 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java @@ -29,6 +29,14 @@ import org.apache.camel.spi.UnitOfWork; public interface ExtendedExchange extends Exchange { /** + * If there is an existing inbound message of the given type then return it as-is, otherwise return null. + * + * @param type the given type + * @return the message if exists with the given type, otherwise null. + */ + <T> T getInOrNull(Class<T> type); + + /** * Sets the endpoint which originated this message exchange. This method should typically only be called by * {@link Endpoint} implementations */ diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java index 6e5daa6..e4f1cdb 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java @@ -347,6 +347,18 @@ class AbstractExchange implements ExtendedExchange { } @Override + public <T> T getInOrNull(Class<T> type) { + if (in == null) { + return null; + } + if (type.isInstance(in)) { + return type.cast(in); + } + + return null; + } + + @Override public void setIn(Message in) { this.in = in; configureMessage(in);
