This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push: new c046bb5 CAMEL-16222: PooledExchangeFactory experiment c046bb5 is described below commit c046bb55a20ffc27bf744098cc866f9171ee09c2 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Feb 21 17:48:50 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../consul/endpoint/ConsulKeyValueConsumer.java | 3 +- .../camel/component/dataset/DataSetConsumer.java | 5 --- .../camel/component/debezium/DebeziumConsumer.java | 3 +- .../camel/component/debezium/DebeziumEndpoint.java | 9 ++++- .../component/debezium/DebeziumEndpointTest.java | 12 +++--- .../component/disruptor/DisruptorConsumer.java | 2 +- .../docker/consumer/DockerEventsConsumer.java | 23 +++++------ .../docker/consumer/DockerStatsConsumer.java | 23 +++++------ .../consumer/DropboxScheduledPollConsumer.java | 4 -- .../consumer/DropboxScheduledPollGetConsumer.java | 45 +++++++++++----------- .../DropboxScheduledPollSearchConsumer.java | 33 ++++++++-------- 11 files changed, 74 insertions(+), 88 deletions(-) diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java index b7b8cb3..e3adb3a 100644 --- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java @@ -87,7 +87,8 @@ public final class ConsulKeyValueConsumer extends AbstractConsulConsumer<KeyValu } message.setBody( - configuration.isValueAsString() ? value.getValueAsString().orElse(null) : value.getValue().orElse(null)); + configuration.isValueAsString() + ? value.getValueAsString().orElse(null) : value.getValue().orElse(null)); getProcessor().process(exchange); } catch (Exception e) { diff --git a/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java b/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java index 84ef2cf..3fb132e 100644 --- a/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java +++ b/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java @@ -43,11 +43,6 @@ public class DataSetConsumer extends DefaultConsumer { } @Override - protected void doInit() throws Exception { - super.doInit(); - } - - @Override protected void doStart() throws Exception { super.doStart(); diff --git a/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java b/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java index dee39dd..2ed84a8 100644 --- a/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java +++ b/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java @@ -74,7 +74,7 @@ public class DebeziumConsumer extends DefaultConsumer { } private void onEventListener(final ChangeEvent<SourceRecord, SourceRecord> event) { - final Exchange exchange = endpoint.createDbzExchange(event.value()); + final Exchange exchange = endpoint.createDbzExchange(this, event.value()); try { // send message to next processor in the route @@ -87,6 +87,7 @@ public class DebeziumConsumer extends DefaultConsumer { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } + releaseExchange(exchange, false); } } } diff --git a/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java b/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java index 10d5a36..9cb5e1a 100644 --- a/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java +++ b/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java @@ -63,8 +63,13 @@ public abstract class DebeziumEndpoint<C extends EmbeddedDebeziumConfiguration> "DebeziumConsumer"); } - public Exchange createDbzExchange(final SourceRecord record) { - final Exchange exchange = super.createExchange(); + public Exchange createDbzExchange(DebeziumConsumer consumer, final SourceRecord record) { + final Exchange exchange; + if (consumer != null) { + exchange = consumer.createExchange(false); + } else { + exchange = super.createExchange(); + } final Message message = exchange.getIn(); diff --git a/components/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java b/components/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java index 853e316..4c9c4ce 100644 --- a/components/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java +++ b/components/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java @@ -76,7 +76,7 @@ public class DebeziumEndpointTest { void testIfCreatesExchangeFromSourceCreateRecord() { final SourceRecord sourceRecord = createCreateRecord(); - final Exchange exchange = debeziumEndpoint.createDbzExchange(sourceRecord); + final Exchange exchange = debeziumEndpoint.createDbzExchange(null, sourceRecord); final Message inMessage = exchange.getIn(); assertNotNull(exchange); @@ -102,7 +102,7 @@ public class DebeziumEndpointTest { void testIfCreatesExchangeFromSourceDeleteRecord() { final SourceRecord sourceRecord = createDeleteRecord(); - final Exchange exchange = debeziumEndpoint.createDbzExchange(sourceRecord); + final Exchange exchange = debeziumEndpoint.createDbzExchange(null, sourceRecord); final Message inMessage = exchange.getIn(); assertNotNull(exchange); @@ -123,7 +123,7 @@ public class DebeziumEndpointTest { void testIfCreatesExchangeFromSourceDeleteRecordWithNull() { final SourceRecord sourceRecord = createDeleteRecordWithNull(); - final Exchange exchange = debeziumEndpoint.createDbzExchange(sourceRecord); + final Exchange exchange = debeziumEndpoint.createDbzExchange(null, sourceRecord); final Message inMessage = exchange.getIn(); assertNotNull(exchange); @@ -141,7 +141,7 @@ public class DebeziumEndpointTest { void testIfCreatesExchangeFromSourceUpdateRecord() { final SourceRecord sourceRecord = createUpdateRecord(); - final Exchange exchange = debeziumEndpoint.createDbzExchange(sourceRecord); + final Exchange exchange = debeziumEndpoint.createDbzExchange(null, sourceRecord); final Message inMessage = exchange.getIn(); assertNotNull(exchange); @@ -166,7 +166,7 @@ public class DebeziumEndpointTest { void testIfCreatesExchangeFromSourceRecordOtherThanStruct() { final SourceRecord sourceRecord = createStringRecord(); - final Exchange exchange = debeziumEndpoint.createDbzExchange(sourceRecord); + final Exchange exchange = debeziumEndpoint.createDbzExchange(null, sourceRecord); final Message inMessage = exchange.getIn(); assertNotNull(exchange); @@ -184,7 +184,7 @@ public class DebeziumEndpointTest { void testIfHandlesUnknownSchema() { final SourceRecord sourceRecord = createUnknownUnnamedSchemaRecord(); - final Exchange exchange = debeziumEndpoint.createDbzExchange(sourceRecord); + final Exchange exchange = debeziumEndpoint.createDbzExchange(null, sourceRecord); final Message inMessage = exchange.getIn(); assertNotNull(exchange); diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java index 29cb921..b9734af 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java @@ -199,7 +199,7 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe } @Override - public void releaseExchange(Exchange exchange) { + public void releaseExchange(Exchange exchange, boolean autoRelease) { // noop } diff --git a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java index 005c085..4ab51f6 100644 --- a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java +++ b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java @@ -88,24 +88,19 @@ public class DockerEventsConsumer extends DefaultConsumer { public void onNext(Event event) { LOG.debug("Received Docker Event: {}", event); - final Exchange exchange = getEndpoint().createExchange(); + final Exchange exchange = createExchange(true); Message message = exchange.getIn(); message.setBody(event); - try { - LOG.trace("Processing exchange [{}]...", exchange); - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - LOG.trace("Done processing exchange [{}]...", exchange); + LOG.trace("Processing exchange [{}]...", exchange); + getAsyncProcessor().process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } - }); - } catch (Exception e) { - exchange.setException(e); - } - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } + } + }); } } } diff --git a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java index 268a823..4160f37 100644 --- a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java +++ b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java @@ -78,24 +78,19 @@ public class DockerStatsConsumer extends DefaultConsumer { public void onNext(Statistics statistics) { LOGGER.debug("Received Docker Statistics Event: {}", statistics); - final Exchange exchange = getEndpoint().createExchange(); + final Exchange exchange = createExchange(true); Message message = exchange.getIn(); message.setBody(statistics); - try { - LOGGER.trace("Processing exchange [{}]...", exchange); - getAsyncProcessor().process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - LOGGER.trace("Done processing exchange [{}]...", exchange); + LOGGER.trace("Processing exchange [{}]...", exchange); + getAsyncProcessor().process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } - }); - } catch (Exception e) { - exchange.setException(e); - } - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } + } + }); } } } diff --git a/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollConsumer.java b/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollConsumer.java index 5d80590..36f8252 100644 --- a/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollConsumer.java +++ b/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollConsumer.java @@ -40,8 +40,6 @@ public abstract class DropboxScheduledPollConsumer extends ScheduledPollConsumer /** * Lifecycle method invoked when the consumer has created. Internally create or reuse a connection to the low level * dropbox client - * - * @throws Exception */ @Override protected void doStart() throws Exception { @@ -57,8 +55,6 @@ public abstract class DropboxScheduledPollConsumer extends ScheduledPollConsumer /** * Lifecycle method invoked when the consumer has destroyed. Erase the reference to the dropbox low level client - * - * @throws Exception */ @Override protected void doStop() throws Exception { diff --git a/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollGetConsumer.java b/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollGetConsumer.java index 4097722..e624305 100644 --- a/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollGetConsumer.java +++ b/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollGetConsumer.java @@ -35,35 +35,33 @@ public class DropboxScheduledPollGetConsumer extends DropboxScheduledPollConsume /** * Poll from a dropbox remote path and put the result in the message exchange * - * @return number of messages polled - * @throws Exception + * @return number of messages polled */ @Override protected int poll() throws Exception { - Exchange exchange = endpoint.createExchange(); - DropboxFileDownloadResult result = new DropboxAPIFacade(configuration.getClient(), exchange) - .get(configuration.getRemotePath()); + Exchange exchange = createExchange(false); + try { + DropboxFileDownloadResult result = new DropboxAPIFacade(configuration.getClient(), exchange) + .get(configuration.getRemotePath()); - Map<String, Object> map = result.getEntries(); - if (map.size() == 1) { - for (Map.Entry<String, Object> entry : map.entrySet()) { - exchange.getIn().setHeader(DropboxResultHeader.DOWNLOADED_FILE.name(), entry.getKey()); - exchange.getIn().setBody(entry.getValue()); - } - } else { - StringBuilder pathsExtracted = new StringBuilder(); - for (Map.Entry<String, Object> entry : map.entrySet()) { - pathsExtracted.append(entry.getKey()).append("\n"); + Map<String, Object> map = result.getEntries(); + if (map.size() == 1) { + for (Map.Entry<String, Object> entry : map.entrySet()) { + exchange.getIn().setHeader(DropboxResultHeader.DOWNLOADED_FILE.name(), entry.getKey()); + exchange.getIn().setBody(entry.getValue()); + } + } else { + StringBuilder pathsExtracted = new StringBuilder(); + for (Map.Entry<String, Object> entry : map.entrySet()) { + pathsExtracted.append(entry.getKey()).append("\n"); + } + exchange.getIn().setHeader(DropboxResultHeader.DOWNLOADED_FILES.name(), pathsExtracted.toString()); + exchange.getIn().setBody(map); } - exchange.getIn().setHeader(DropboxResultHeader.DOWNLOADED_FILES.name(), pathsExtracted.toString()); - exchange.getIn().setBody(map); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Downloaded: {}", result); - } - - try { + if (LOG.isDebugEnabled()) { + LOG.debug("Downloaded: {}", result); + } // send message to next processor in the route getProcessor().process(exchange); return 1; // number of messages polled @@ -72,6 +70,7 @@ public class DropboxScheduledPollGetConsumer extends DropboxScheduledPollConsume if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } + releaseExchange(exchange, false); } } } diff --git a/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollSearchConsumer.java b/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollSearchConsumer.java index 39c39e5..82b8017 100644 --- a/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollSearchConsumer.java +++ b/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollSearchConsumer.java @@ -35,29 +35,27 @@ public class DropboxScheduledPollSearchConsumer extends DropboxScheduledPollCons /** * Poll from a dropbox remote path and put the result in the message exchange * - * @return number of messages polled - * @throws Exception + * @return number of messages polled */ @Override protected int poll() throws Exception { - Exchange exchange = endpoint.createExchange(); - DropboxSearchResult result = new DropboxAPIFacade(configuration.getClient(), exchange) - .search(configuration.getRemotePath(), configuration.getQuery()); + Exchange exchange = createExchange(false); + try { + DropboxSearchResult result = new DropboxAPIFacade(configuration.getClient(), exchange) + .search(configuration.getRemotePath(), configuration.getQuery()); - StringBuilder fileExtracted = new StringBuilder(); - for (SearchMatch entry : result.getFound()) { - fileExtracted.append(entry.getMetadata().getName()).append("-").append(entry.getMetadata().getPathDisplay()) - .append("\n"); - } + StringBuilder fileExtracted = new StringBuilder(); + for (SearchMatch entry : result.getFound()) { + fileExtracted.append(entry.getMetadata().getName()).append("-").append(entry.getMetadata().getPathDisplay()) + .append("\n"); + } - exchange.getIn().setHeader(DropboxResultHeader.FOUND_FILES.name(), fileExtracted.toString()); - exchange.getIn().setBody(result.getFound()); + exchange.getIn().setHeader(DropboxResultHeader.FOUND_FILES.name(), fileExtracted.toString()); + exchange.getIn().setBody(result.getFound()); - if (LOG.isDebugEnabled()) { - LOG.debug("Downloaded: {}", result); - } - - try { + if (LOG.isDebugEnabled()) { + LOG.debug("Downloaded: {}", result); + } // send message to next processor in the route getProcessor().process(exchange); return 1; // number of messages polled @@ -66,6 +64,7 @@ public class DropboxScheduledPollSearchConsumer extends DropboxScheduledPollCons if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } + releaseExchange(exchange, false); } } }