This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit ab56b04631203cf7b7b7f058419e1c188b263ff1 Author: Claus Ibsen <[email protected]> AuthorDate: Mon Feb 22 09:48:44 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../camel/component/paho/mqtt5/PahoMqtt5Consumer.java | 14 +++++++++++++- .../camel/component/paho/mqtt5/PahoMqtt5Endpoint.java | 13 ------------- .../java/org/apache/camel/component/paho/PahoConsumer.java | 14 +++++++++++++- .../java/org/apache/camel/component/paho/PahoEndpoint.java | 14 -------------- .../pg/replication/slot/PgReplicationSlotConsumer.java | 2 +- .../apache/camel/component/pgevent/PgEventConsumer.java | 10 +++++++--- .../platform/http/vertx/VertxPlatformHttpConsumer.java | 4 +--- .../org/apache/camel/component/pubnub/PubNubConsumer.java | 9 ++++----- 8 files changed, 39 insertions(+), 41 deletions(-) diff --git a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java index 57260b0..af8345a 100644 --- a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java +++ b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java @@ -103,7 +103,7 @@ public class PahoMqtt5Consumer extends DefaultConsumer { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { LOG.debug("Message arrived on topic: {} -> {}", topic, message); - Exchange exchange = getEndpoint().createExchange(message, topic); + Exchange exchange = createExchange(message, topic); getAsyncProcessor().process(exchange, doneSync -> { // noop @@ -144,4 +144,16 @@ public class PahoMqtt5Consumer extends DefaultConsumer { return (PahoMqtt5Endpoint) super.getEndpoint(); } + public Exchange createExchange(MqttMessage mqttMessage, String topic) { + Exchange exchange = createExchange(true); + + PahoMqtt5Message paho = new PahoMqtt5Message(exchange.getContext(), mqttMessage); + paho.setBody(mqttMessage.getPayload()); + paho.setHeader(PahoMqtt5Constants.MQTT_TOPIC, topic); + paho.setHeader(PahoMqtt5Constants.MQTT_QOS, mqttMessage.getQos()); + + exchange.setIn(paho); + return exchange; + } + } diff --git a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Endpoint.java b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Endpoint.java index f558ac1..725df0d 100644 --- a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Endpoint.java +++ b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Endpoint.java @@ -20,7 +20,6 @@ import java.util.UUID; import org.apache.camel.Category; import org.apache.camel.Consumer; -import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.spi.Metadata; @@ -83,18 +82,6 @@ public class PahoMqtt5Endpoint extends DefaultEndpoint { return topic; } - public Exchange createExchange(MqttMessage mqttMessage, String topic) { - Exchange exchange = createExchange(); - - PahoMqtt5Message paho = new PahoMqtt5Message(exchange.getContext(), mqttMessage); - paho.setBody(mqttMessage.getPayload()); - paho.setHeader(PahoMqtt5Constants.MQTT_TOPIC, topic); - paho.setHeader(PahoMqtt5Constants.MQTT_QOS, mqttMessage.getQos()); - - exchange.setIn(paho); - return exchange; - } - protected MqttConnectionOptions createMqttConnectionOptions() { PahoMqtt5Configuration config = getConfiguration(); MqttConnectionOptions options = new MqttConnectionOptions(); diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java index 4e42547..f908efd 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java @@ -92,7 +92,7 @@ public class PahoConsumer extends DefaultConsumer { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { LOG.debug("Message arrived on topic: {} -> {}", topic, message); - Exchange exchange = getEndpoint().createExchange(message, topic); + Exchange exchange = createExchange(message, topic); getAsyncProcessor().process(exchange, new AsyncCallback() { @Override @@ -136,4 +136,16 @@ public class PahoConsumer extends DefaultConsumer { return (PahoEndpoint) super.getEndpoint(); } + public Exchange createExchange(MqttMessage mqttMessage, String topic) { + Exchange exchange = createExchange(true); + + PahoMessage paho = new PahoMessage(exchange.getContext(), mqttMessage); + paho.setBody(mqttMessage.getPayload()); + paho.setHeader(PahoConstants.MQTT_TOPIC, topic); + paho.setHeader(PahoConstants.MQTT_QOS, mqttMessage.getQos()); + + exchange.setIn(paho); + return exchange; + } + } diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java index 920cc28..d885204 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoEndpoint.java @@ -18,7 +18,6 @@ package org.apache.camel.component.paho; import org.apache.camel.Category; import org.apache.camel.Consumer; -import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.spi.Metadata; @@ -30,7 +29,6 @@ import org.apache.camel.util.ObjectHelper; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClientPersistence; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; @@ -80,18 +78,6 @@ public class PahoEndpoint extends DefaultEndpoint { return topic; } - public Exchange createExchange(MqttMessage mqttMessage, String topic) { - Exchange exchange = createExchange(); - - PahoMessage paho = new PahoMessage(exchange.getContext(), mqttMessage); - paho.setBody(mqttMessage.getPayload()); - paho.setHeader(PahoConstants.MQTT_TOPIC, topic); - paho.setHeader(PahoConstants.MQTT_QOS, mqttMessage.getQos()); - - exchange.setIn(paho); - return exchange; - } - protected static MqttConnectOptions createMqttConnectOptions(PahoConfiguration config) { MqttConnectOptions mq = new MqttConnectOptions(); if (ObjectHelper.isNotEmpty(config.getUserName()) && ObjectHelper.isNotEmpty(config.getPassword())) { diff --git a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java index 9b2dcc3..d911431 100644 --- a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java +++ b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java @@ -125,7 +125,7 @@ public class PgReplicationSlotConsumer extends ScheduledPollConsumer { throw e; } - Exchange exchange = this.endpoint.createExchange(); + Exchange exchange = createExchange(true); exchange.setExchangeId(stream.getLastReceiveLSN().asString()); Message message = exchange.getIn(); diff --git a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java index 0c248a4..aebe8e3 100644 --- a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java +++ b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java @@ -59,18 +59,22 @@ public class PgEventConsumer extends DefaultConsumer implements PGNotificationLi LOG.debug("Notification processId: {}, channel: {}, payload: {}", processId, channel, payload); } - Exchange exchange = endpoint.createExchange(); + Exchange exchange = createExchange(false); Message msg = exchange.getIn(); msg.setHeader("channel", channel); msg.setBody(payload); try { getProcessor().process(exchange); - } catch (Exception ex) { + } catch (Exception e) { + exchange.setException(e); + } + if (exchange.getException() != null) { String cause = "Unable to process incoming notification from PostgreSQL: processId='" + processId + "', channel='" + channel + "', payload='" + payload + "'"; - getExceptionHandler().handleException(cause, ex); + getExceptionHandler().handleException(cause, exchange.getException()); } + releaseExchange(exchange, false); } @Override diff --git a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java index 8937de5..a65d4d4 100644 --- a/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java +++ b/components/camel-platform-http-vertx/src/main/java/org/apache/camel/component/platform/http/vertx/VertxPlatformHttpConsumer.java @@ -64,11 +64,9 @@ public class VertxPlatformHttpConsumer extends DefaultConsumer { private final String fileNameExtWhitelist; private Set<Method> methods; private String path; - private Route route; - public VertxPlatformHttpConsumer( - PlatformHttpEndpoint endpoint, + public VertxPlatformHttpConsumer(PlatformHttpEndpoint endpoint, Processor processor, List<Handler<RoutingContext>> handlers) { super(endpoint, processor); diff --git a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java index 243aeb0..ad3e200 100644 --- a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java +++ b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java @@ -103,7 +103,7 @@ public class PubNubConsumer extends DefaultConsumer { @Override public void message(PubNub pubnub, PNMessageResult message) { - Exchange exchange = endpoint.createExchange(); + Exchange exchange = createExchange(true); Message inmessage = exchange.getIn(); inmessage.setBody(message); inmessage.setHeader(TIMETOKEN, message.getTimetoken()); @@ -111,13 +111,13 @@ public class PubNubConsumer extends DefaultConsumer { try { getProcessor().process(exchange); } catch (Exception e) { - getExceptionHandler().handleException("Error processing exchange", exchange, e); + getExceptionHandler().handleException("Error processing exchange", e); } } @Override public void presence(PubNub pubnub, PNPresenceEventResult presence) { - Exchange exchange = endpoint.createExchange(); + Exchange exchange = createExchange(true); Message inmessage = exchange.getIn(); inmessage.setBody(presence); inmessage.setHeader(TIMETOKEN, presence.getTimetoken()); @@ -125,8 +125,7 @@ public class PubNubConsumer extends DefaultConsumer { try { getProcessor().process(exchange); } catch (Exception e) { - exchange.setException(e); - getExceptionHandler().handleException("Error processing exchange", exchange, e); + getExceptionHandler().handleException("Error processing exchange", e); } }
