This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 23172db0bf5083bc54290dc315f509e881ea6cbf
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Feb 23 10:31:55 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../apache/camel/component/sjms/SjmsEndpoint.java    |  8 ++++----
 .../sjms/consumer/EndpointMessageListener.java       | 20 ++++++++++++++++++--
 2 files changed, 22 insertions(+), 6 deletions(-)

diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index d8e1bee..c65df4f 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -318,13 +318,13 @@ public class SjmsEndpoint extends DefaultEndpoint
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        EndpointMessageListener listener = new EndpointMessageListener(this, 
processor);
-        configureMessageListener(listener);
-
         MessageListenerContainer container = 
createMessageListenerContainer(this);
+        SjmsConsumer consumer = new SjmsConsumer(this, processor, container);
+
+        EndpointMessageListener listener = new 
EndpointMessageListener(consumer, this, processor);
+        configureMessageListener(listener);
         container.setMessageListener(listener);
 
-        SjmsConsumer consumer = new SjmsConsumer(this, processor, container);
         configureConsumer(consumer);
         return consumer;
     }
diff --git 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
index 8c7b9d3..b3d9f11 100644
--- 
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
+++ 
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
@@ -35,7 +35,9 @@ import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.sjms.SessionCallback;
 import org.apache.camel.component.sjms.SessionMessageListener;
 import org.apache.camel.component.sjms.SjmsConstants;
+import org.apache.camel.component.sjms.SjmsConsumer;
 import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.component.sjms.SjmsMessage;
 import org.apache.camel.component.sjms.SjmsTemplate;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -54,6 +56,7 @@ public class EndpointMessageListener implements 
SessionMessageListener {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(EndpointMessageListener.class);
 
+    private final SjmsConsumer consumer;
     private final SjmsEndpoint endpoint;
     private final AsyncProcessor processor;
     private Object replyToDestination;
@@ -63,7 +66,8 @@ public class EndpointMessageListener implements 
SessionMessageListener {
     private String eagerPoisonBody;
     private volatile SjmsTemplate template;
 
-    public EndpointMessageListener(SjmsEndpoint endpoint, Processor processor) 
{
+    public EndpointMessageListener(SjmsConsumer consumer, SjmsEndpoint 
endpoint, Processor processor) {
+        this.consumer = consumer;
         this.endpoint = endpoint;
         this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
@@ -207,6 +211,9 @@ public class EndpointMessageListener implements 
SessionMessageListener {
             // if we failed processed the exchange from the async callback 
task, then grab the exception
             rce = exchange.getException(RuntimeCamelException.class);
 
+            // the exchange is now done so release it
+            consumer.releaseExchange(exchange, false);
+
         } catch (Exception e) {
             rce = wrapRuntimeCamelException(e);
         }
@@ -234,7 +241,10 @@ public class EndpointMessageListener implements 
SessionMessageListener {
     }
 
     public Exchange createExchange(Message message, Session session, Object 
replyDestination) {
-        Exchange exchange = endpoint.createExchange(message, session);
+        Exchange exchange = consumer.createExchange(false);
+        // create a mew SjmsMessage as it cannot be reset for reuse
+        // TODO: optimize to allow reset
+        exchange.setIn(new SjmsMessage(exchange, message, session, 
endpoint.getBinding()));
 
         // lets set to an InOut if we have some kind of reply-to destination
         if (replyDestination != null && !disableReplyTo) {
@@ -455,6 +465,12 @@ public class EndpointMessageListener implements 
SessionMessageListener {
                     }
                 }
             }
+
+            // if we completed from async processing then we should release 
the exchange
+            // the sync processing will release the exchange outside this 
callback
+            if (!doneSync) {
+                consumer.releaseExchange(exchange, false);
+            }
         }
     }
 

Reply via email to