This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ab56b04631203cf7b7b7f058419e1c188b263ff1
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Feb 22 09:48:44 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../camel/component/paho/mqtt5/PahoMqtt5Consumer.java      | 14 +++++++++++++-
 .../camel/component/paho/mqtt5/PahoMqtt5Endpoint.java      | 13 -------------
 .../java/org/apache/camel/component/paho/PahoConsumer.java | 14 +++++++++++++-
 .../java/org/apache/camel/component/paho/PahoEndpoint.java | 14 --------------
 .../pg/replication/slot/PgReplicationSlotConsumer.java     |  2 +-
 .../apache/camel/component/pgevent/PgEventConsumer.java    | 10 +++++++---
 .../platform/http/vertx/VertxPlatformHttpConsumer.java     |  4 +---
 .../org/apache/camel/component/pubnub/PubNubConsumer.java  |  9 ++++-----
 8 files changed, 39 insertions(+), 41 deletions(-)

diff --git 
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
 
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
index 57260b0..af8345a 100644
--- 
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
+++ 
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
@@ -103,7 +103,7 @@ public class PahoMqtt5Consumer extends DefaultConsumer {
             @Override
             public void messageArrived(String topic, MqttMessage message) 
throws Exception {
                 LOG.debug("Message arrived on topic: {} -> {}", topic, 
message);
-                Exchange exchange = getEndpoint().createExchange(message, 
topic);
+                Exchange exchange = createExchange(message, topic);
 
                 getAsyncProcessor().process(exchange, doneSync -> {
                     // noop
@@ -144,4 +144,16 @@ public class PahoMqtt5Consumer extends DefaultConsumer {
         return (PahoMqtt5Endpoint) super.getEndpoint();
     }
 
+    public Exchange createExchange(MqttMessage mqttMessage, String topic) {
+        Exchange exchange = createExchange(true);
+
+        PahoMqtt5Message paho = new PahoMqtt5Message(exchange.getContext(), 
mqttMessage);
+        paho.setBody(mqttMessage.getPayload());
+        paho.setHeader(PahoMqtt5Constants.MQTT_TOPIC, topic);
+        paho.setHeader(PahoMqtt5Constants.MQTT_QOS, mqttMessage.getQos());
+
+        exchange.setIn(paho);
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Endpoint.java
 
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Endpoint.java
index f558ac1..725df0d 100644
--- 
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Endpoint.java
+++ 
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Endpoint.java
@@ -20,7 +20,6 @@ import java.util.UUID;
 
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.Metadata;
@@ -83,18 +82,6 @@ public class PahoMqtt5Endpoint extends DefaultEndpoint {
         return topic;
     }
 
-    public Exchange createExchange(MqttMessage mqttMessage, String topic) {
-        Exchange exchange = createExchange();
-
-        PahoMqtt5Message paho = new PahoMqtt5Message(exchange.getContext(), 
mqttMessage);
-        paho.setBody(mqttMessage.getPayload());
-        paho.setHeader(PahoMqtt5Constants.MQTT_TOPIC, topic);
-        paho.setHeader(PahoMqtt5Constants.MQTT_QOS, mqttMessage.getQos());
-
-        exchange.setIn(paho);
-        return exchange;
-    }
-
     protected MqttConnectionOptions createMqttConnectionOptions() {
         PahoMqtt5Configuration config = getConfiguration();
         MqttConnectionOptions options = new MqttConnectionOptions();
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 4e42547..f908efd 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -92,7 +92,7 @@ public class PahoConsumer extends DefaultConsumer {
             @Override
             public void messageArrived(String topic, MqttMessage message) 
throws Exception {
                 LOG.debug("Message arrived on topic: {} -> {}", topic, 
message);
-                Exchange exchange = getEndpoint().createExchange(message, 
topic);
+                Exchange exchange = createExchange(message, topic);
 
                 getAsyncProcessor().process(exchange, new AsyncCallback() {
                     @Override
@@ -136,4 +136,16 @@ public class PahoConsumer extends DefaultConsumer {
         return (PahoEndpoint) super.getEndpoint();
     }
 
+    public Exchange createExchange(MqttMessage mqttMessage, String topic) {
+        Exchange exchange = createExchange(true);
+
+        PahoMessage paho = new PahoMessage(exchange.getContext(), mqttMessage);
+        paho.setBody(mqttMessage.getPayload());
+        paho.setHeader(PahoConstants.MQTT_TOPIC, topic);
+        paho.setHeader(PahoConstants.MQTT_QOS, mqttMessage.getQos());
+
+        exchange.setIn(paho);
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
index 920cc28..d885204 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.paho;
 
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.Metadata;
@@ -30,7 +29,6 @@ import org.apache.camel.util.ObjectHelper;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
 
@@ -80,18 +78,6 @@ public class PahoEndpoint extends DefaultEndpoint {
         return topic;
     }
 
-    public Exchange createExchange(MqttMessage mqttMessage, String topic) {
-        Exchange exchange = createExchange();
-
-        PahoMessage paho = new PahoMessage(exchange.getContext(), mqttMessage);
-        paho.setBody(mqttMessage.getPayload());
-        paho.setHeader(PahoConstants.MQTT_TOPIC, topic);
-        paho.setHeader(PahoConstants.MQTT_QOS, mqttMessage.getQos());
-
-        exchange.setIn(paho);
-        return exchange;
-    }
-
     protected static MqttConnectOptions 
createMqttConnectOptions(PahoConfiguration config) {
         MqttConnectOptions mq = new MqttConnectOptions();
         if (ObjectHelper.isNotEmpty(config.getUserName()) && 
ObjectHelper.isNotEmpty(config.getPassword())) {
diff --git 
a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
 
b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
index 9b2dcc3..d911431 100644
--- 
a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
+++ 
b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
@@ -125,7 +125,7 @@ public class PgReplicationSlotConsumer extends 
ScheduledPollConsumer {
             throw e;
         }
 
-        Exchange exchange = this.endpoint.createExchange();
+        Exchange exchange = createExchange(true);
         exchange.setExchangeId(stream.getLastReceiveLSN().asString());
 
         Message message = exchange.getIn();
diff --git 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
index 0c248a4..aebe8e3 100644
--- 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
+++ 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
@@ -59,18 +59,22 @@ public class PgEventConsumer extends DefaultConsumer 
implements PGNotificationLi
             LOG.debug("Notification processId: {}, channel: {}, payload: {}", 
processId, channel, payload);
         }
 
-        Exchange exchange = endpoint.createExchange();
+        Exchange exchange = createExchange(false);
         Message msg = exchange.getIn();
         msg.setHeader("channel", channel);
         msg.setBody(payload);
 
         try {
             getProcessor().process(exchange);
-        } catch (Exception ex) {
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+        if (exchange.getException() != null) {
             String cause = "Unable to process incoming notification from 
PostgreSQL: processId='" + processId + "', channel='"
                            + channel + "', payload='" + payload + "'";
-            getExceptionHandler().handleException(cause, ex);
+            getExceptionHandler().handleException(cause, 
exchange.getException());
         }
+        releaseExchange(exchange, false);
     }
 
     @Override
diff --git 
a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
 
b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
index 8937de5..a65d4d4 100644
--- 
a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
+++ 
b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java
@@ -64,11 +64,9 @@ public class VertxPlatformHttpConsumer extends 
DefaultConsumer {
     private final String fileNameExtWhitelist;
     private Set<Method> methods;
     private String path;
-
     private Route route;
 
-    public VertxPlatformHttpConsumer(
-                                     PlatformHttpEndpoint endpoint,
+    public VertxPlatformHttpConsumer(PlatformHttpEndpoint endpoint,
                                      Processor processor,
                                      List<Handler<RoutingContext>> handlers) {
         super(endpoint, processor);
diff --git 
a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java
 
b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java
index 243aeb0..ad3e200 100644
--- 
a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java
+++ 
b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java
@@ -103,7 +103,7 @@ public class PubNubConsumer extends DefaultConsumer {
 
         @Override
         public void message(PubNub pubnub, PNMessageResult message) {
-            Exchange exchange = endpoint.createExchange();
+            Exchange exchange = createExchange(true);
             Message inmessage = exchange.getIn();
             inmessage.setBody(message);
             inmessage.setHeader(TIMETOKEN, message.getTimetoken());
@@ -111,13 +111,13 @@ public class PubNubConsumer extends DefaultConsumer {
             try {
                 getProcessor().process(exchange);
             } catch (Exception e) {
-                getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
+                getExceptionHandler().handleException("Error processing 
exchange", e);
             }
         }
 
         @Override
         public void presence(PubNub pubnub, PNPresenceEventResult presence) {
-            Exchange exchange = endpoint.createExchange();
+            Exchange exchange = createExchange(true);
             Message inmessage = exchange.getIn();
             inmessage.setBody(presence);
             inmessage.setHeader(TIMETOKEN, presence.getTimetoken());
@@ -125,8 +125,7 @@ public class PubNubConsumer extends DefaultConsumer {
             try {
                 getProcessor().process(exchange);
             } catch (Exception e) {
-                exchange.setException(e);
-                getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
+                getExceptionHandler().handleException("Error processing 
exchange", e);
             }
         }
 

Reply via email to