This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0405d1977236f5571f62723efc3c2802a6b0e2ac Author: Claus Ibsen <[email protected]> AuthorDate: Sun Feb 21 09:24:58 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../java/org/apache/camel/component/ahc/ws/WsConsumer.java | 2 +- .../java/org/apache/camel/component/apns/ApnsConsumer.java | 2 +- .../java/org/apache/camel/component/as2/AS2Consumer.java | 9 +++++---- .../apache/camel/component/asterisk/AsteriskConsumer.java | 9 +++++---- .../integration/consumer/AtmosScheduledPollConsumer.java | 4 ---- .../consumer/AtmosScheduledPollGetConsumer.java | 14 +++++++------- .../component/atmosphere/websocket/WebsocketConsumer.java | 6 +++--- 7 files changed, 22 insertions(+), 24 deletions(-) diff --git a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java index 1341bb4..8bf9923 100644 --- a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java +++ b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java @@ -68,7 +68,7 @@ public class WsConsumer extends DefaultConsumer { } private void sendMessageInternal(Object message) { - final Exchange exchange = getEndpoint().createExchange(); + final Exchange exchange = createExchange(true); //TODO may set some headers with some meta info (e.g., socket info, unique-id for correlation purpose, etc0 // set the body diff --git a/components/camel-apns/src/main/java/org/apache/camel/component/apns/ApnsConsumer.java b/components/camel-apns/src/main/java/org/apache/camel/component/apns/ApnsConsumer.java index d0782a6..5d45e70 100644 --- a/components/camel-apns/src/main/java/org/apache/camel/component/apns/ApnsConsumer.java +++ b/components/camel-apns/src/main/java/org/apache/camel/component/apns/ApnsConsumer.java @@ -51,7 +51,7 @@ public class ApnsConsumer extends ScheduledPollConsumer { while (it.hasNext()) { InactiveDevice inactiveDevice = it.next(); - Exchange e = getEndpoint().createExchange(); + Exchange e = createExchange(true); e.getIn().setBody(inactiveDevice); getProcessor().process(e); } diff --git a/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/AS2Consumer.java b/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/AS2Consumer.java index 7e767eb..08b68d2 100644 --- a/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/AS2Consumer.java +++ b/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/AS2Consumer.java @@ -124,17 +124,18 @@ public class AS2Consumer extends AbstractApiConsumer<AS2ApiName, AS2Configuratio = HttpMessageUtils.extractEdiPayload(request, as2ServerConnection.getDecryptingPrivateKey()); // Set AS2 Interchange property and EDI message into body of input message. - Exchange exchange = getEndpoint().createExchange(); - HttpCoreContext coreContext = HttpCoreContext.adapt(context); - exchange.setProperty(AS2Constants.AS2_INTERCHANGE, coreContext); - exchange.getIn().setBody(ediEntity.getEdiMessage()); + Exchange exchange = createExchange(false); try { + HttpCoreContext coreContext = HttpCoreContext.adapt(context); + exchange.setProperty(AS2Constants.AS2_INTERCHANGE, coreContext); + exchange.getIn().setBody(ediEntity.getEdiMessage()); // send message to next processor in the route getProcessor().process(exchange); } finally { // check if an exception occurred and was not handled exception = exchange.getException(); + releaseExchange(exchange, false); } } catch (Exception e) { LOG.warn("Failed to process AS2 message", e); diff --git a/components/camel-asterisk/src/main/java/org/apache/camel/component/asterisk/AsteriskConsumer.java b/components/camel-asterisk/src/main/java/org/apache/camel/component/asterisk/AsteriskConsumer.java index 24c8368..e650178 100644 --- a/components/camel-asterisk/src/main/java/org/apache/camel/component/asterisk/AsteriskConsumer.java +++ b/components/camel-asterisk/src/main/java/org/apache/camel/component/asterisk/AsteriskConsumer.java @@ -62,14 +62,15 @@ public class AsteriskConsumer extends DefaultConsumer { private final class EventListener implements ManagerEventListener { @Override public void onManagerEvent(ManagerEvent event) { - Exchange exchange = endpoint.createExchange(); - exchange.getIn().setHeader(AsteriskConstants.EVENT_NAME, event.getClass().getSimpleName()); - exchange.getIn().setBody(event); - + Exchange exchange = createExchange(false); try { + exchange.getIn().setHeader(AsteriskConstants.EVENT_NAME, event.getClass().getSimpleName()); + exchange.getIn().setBody(event); getProcessor().process(exchange); } catch (Exception e) { getExceptionHandler().handleException("Error processing exchange.", exchange, e); + } finally { + releaseExchange(exchange, false); } } } diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java index 0c1bb56..950415f 100644 --- a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java @@ -38,8 +38,6 @@ public abstract class AtmosScheduledPollConsumer extends ScheduledPollConsumer { /** * Lifecycle method invoked when the consumer has created. Internally create or reuse a connection to the low level * atmos client - * - * @throws Exception */ @Override protected void doStart() throws Exception { @@ -53,8 +51,6 @@ public abstract class AtmosScheduledPollConsumer extends ScheduledPollConsumer { /** * Lifecycle method invoked when the consumer has destroyed. Erase the reference to the atmos low level client - * - * @throws Exception */ @Override protected void doStop() throws Exception { diff --git a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java index e2d7f43..1f2fc02 100644 --- a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java +++ b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java @@ -32,17 +32,16 @@ public class AtmosScheduledPollGetConsumer extends AtmosScheduledPollConsumer { /** * Poll from an atmos remote path and put the result in the message exchange * - * @return number of messages polled - * @throws Exception + * @return number of messages polled */ @Override protected int poll() throws Exception { - Exchange exchange = endpoint.createExchange(); - AtmosResult result = AtmosAPIFacade.getInstance(configuration.getClient()) - .get(configuration.getRemotePath()); - result.populateExchange(exchange); - + Exchange exchange = createExchange(false); try { + AtmosResult result = AtmosAPIFacade.getInstance(configuration.getClient()) + .get(configuration.getRemotePath()); + result.populateExchange(exchange); + // send message to next processor in the route getProcessor().process(exchange); return 1; // number of messages polled @@ -51,6 +50,7 @@ public class AtmosScheduledPollGetConsumer extends AtmosScheduledPollConsumer { if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } + releaseExchange(exchange, false); } } } diff --git a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java index 8af4199..8957b6a 100644 --- a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java +++ b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java @@ -84,7 +84,7 @@ public class WebsocketConsumer extends ServletConsumer { } public void sendMessage(final String connectionKey, Object message) { - final Exchange exchange = getEndpoint().createExchange(); + final Exchange exchange = createExchange(true); // set header and body exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, connectionKey); @@ -101,7 +101,7 @@ public class WebsocketConsumer extends ServletConsumer { } public void sendEventNotification(String connectionKey, int eventType) { - final Exchange exchange = getEndpoint().createExchange(); + final Exchange exchange = createExchange(true); // set header exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, connectionKey); @@ -122,7 +122,7 @@ public class WebsocketConsumer extends ServletConsumer { } public void sendNotDeliveredMessage(List<String> failedConnectionKeys, Object message) { - final Exchange exchange = getEndpoint().createExchange(); + final Exchange exchange = createExchange(true); // set header and body exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY_LIST, failedConnectionKeys);
