This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new c046bb5  CAMEL-16222: PooledExchangeFactory experiment
c046bb5 is described below

commit c046bb55a20ffc27bf744098cc866f9171ee09c2
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun Feb 21 17:48:50 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../consul/endpoint/ConsulKeyValueConsumer.java    |  3 +-
 .../camel/component/dataset/DataSetConsumer.java   |  5 ---
 .../camel/component/debezium/DebeziumConsumer.java |  3 +-
 .../camel/component/debezium/DebeziumEndpoint.java |  9 ++++-
 .../component/debezium/DebeziumEndpointTest.java   | 12 +++---
 .../component/disruptor/DisruptorConsumer.java     |  2 +-
 .../docker/consumer/DockerEventsConsumer.java      | 23 +++++------
 .../docker/consumer/DockerStatsConsumer.java       | 23 +++++------
 .../consumer/DropboxScheduledPollConsumer.java     |  4 --
 .../consumer/DropboxScheduledPollGetConsumer.java  | 45 +++++++++++-----------
 .../DropboxScheduledPollSearchConsumer.java        | 33 ++++++++--------
 11 files changed, 74 insertions(+), 88 deletions(-)

diff --git 
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java
 
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java
index b7b8cb3..e3adb3a 100644
--- 
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java
+++ 
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java
@@ -87,7 +87,8 @@ public final class ConsulKeyValueConsumer extends 
AbstractConsulConsumer<KeyValu
                 }
 
                 message.setBody(
-                        configuration.isValueAsString() ? 
value.getValueAsString().orElse(null) : value.getValue().orElse(null));
+                        configuration.isValueAsString()
+                                ? value.getValueAsString().orElse(null) : 
value.getValue().orElse(null));
 
                 getProcessor().process(exchange);
             } catch (Exception e) {
diff --git 
a/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
 
b/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
index 84ef2cf..3fb132e 100644
--- 
a/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
+++ 
b/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
@@ -43,11 +43,6 @@ public class DataSetConsumer extends DefaultConsumer {
     }
 
     @Override
-    protected void doInit() throws Exception {
-        super.doInit();
-    }
-
-    @Override
     protected void doStart() throws Exception {
         super.doStart();
 
diff --git 
a/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
 
b/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
index dee39dd..2ed84a8 100644
--- 
a/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
+++ 
b/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
@@ -74,7 +74,7 @@ public class DebeziumConsumer extends DefaultConsumer {
     }
 
     private void onEventListener(final ChangeEvent<SourceRecord, SourceRecord> 
event) {
-        final Exchange exchange = endpoint.createDbzExchange(event.value());
+        final Exchange exchange = endpoint.createDbzExchange(this, 
event.value());
 
         try {
             // send message to next processor in the route
@@ -87,6 +87,7 @@ public class DebeziumConsumer extends DefaultConsumer {
                 getExceptionHandler().handleException("Error processing 
exchange", exchange,
                         exchange.getException());
             }
+            releaseExchange(exchange, false);
         }
     }
 }
diff --git 
a/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
 
b/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
index 10d5a36..9cb5e1a 100644
--- 
a/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
+++ 
b/components/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
@@ -63,8 +63,13 @@ public abstract class DebeziumEndpoint<C extends 
EmbeddedDebeziumConfiguration>
                 "DebeziumConsumer");
     }
 
-    public Exchange createDbzExchange(final SourceRecord record) {
-        final Exchange exchange = super.createExchange();
+    public Exchange createDbzExchange(DebeziumConsumer consumer, final 
SourceRecord record) {
+        final Exchange exchange;
+        if (consumer != null) {
+            exchange = consumer.createExchange(false);
+        } else {
+            exchange = super.createExchange();
+        }
 
         final Message message = exchange.getIn();
 
diff --git 
a/components/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
 
b/components/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
index 853e316..4c9c4ce 100644
--- 
a/components/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
+++ 
b/components/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
@@ -76,7 +76,7 @@ public class DebeziumEndpointTest {
     void testIfCreatesExchangeFromSourceCreateRecord() {
         final SourceRecord sourceRecord = createCreateRecord();
 
-        final Exchange exchange = 
debeziumEndpoint.createDbzExchange(sourceRecord);
+        final Exchange exchange = debeziumEndpoint.createDbzExchange(null, 
sourceRecord);
         final Message inMessage = exchange.getIn();
 
         assertNotNull(exchange);
@@ -102,7 +102,7 @@ public class DebeziumEndpointTest {
     void testIfCreatesExchangeFromSourceDeleteRecord() {
         final SourceRecord sourceRecord = createDeleteRecord();
 
-        final Exchange exchange = 
debeziumEndpoint.createDbzExchange(sourceRecord);
+        final Exchange exchange = debeziumEndpoint.createDbzExchange(null, 
sourceRecord);
         final Message inMessage = exchange.getIn();
 
         assertNotNull(exchange);
@@ -123,7 +123,7 @@ public class DebeziumEndpointTest {
     void testIfCreatesExchangeFromSourceDeleteRecordWithNull() {
         final SourceRecord sourceRecord = createDeleteRecordWithNull();
 
-        final Exchange exchange = 
debeziumEndpoint.createDbzExchange(sourceRecord);
+        final Exchange exchange = debeziumEndpoint.createDbzExchange(null, 
sourceRecord);
         final Message inMessage = exchange.getIn();
 
         assertNotNull(exchange);
@@ -141,7 +141,7 @@ public class DebeziumEndpointTest {
     void testIfCreatesExchangeFromSourceUpdateRecord() {
         final SourceRecord sourceRecord = createUpdateRecord();
 
-        final Exchange exchange = 
debeziumEndpoint.createDbzExchange(sourceRecord);
+        final Exchange exchange = debeziumEndpoint.createDbzExchange(null, 
sourceRecord);
         final Message inMessage = exchange.getIn();
 
         assertNotNull(exchange);
@@ -166,7 +166,7 @@ public class DebeziumEndpointTest {
     void testIfCreatesExchangeFromSourceRecordOtherThanStruct() {
         final SourceRecord sourceRecord = createStringRecord();
 
-        final Exchange exchange = 
debeziumEndpoint.createDbzExchange(sourceRecord);
+        final Exchange exchange = debeziumEndpoint.createDbzExchange(null, 
sourceRecord);
         final Message inMessage = exchange.getIn();
 
         assertNotNull(exchange);
@@ -184,7 +184,7 @@ public class DebeziumEndpointTest {
     void testIfHandlesUnknownSchema() {
         final SourceRecord sourceRecord = createUnknownUnnamedSchemaRecord();
 
-        final Exchange exchange = 
debeziumEndpoint.createDbzExchange(sourceRecord);
+        final Exchange exchange = debeziumEndpoint.createDbzExchange(null, 
sourceRecord);
         final Message inMessage = exchange.getIn();
 
         assertNotNull(exchange);
diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
index 29cb921..b9734af 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
@@ -199,7 +199,7 @@ public class DisruptorConsumer extends ServiceSupport 
implements Consumer, Suspe
     }
 
     @Override
-    public void releaseExchange(Exchange exchange) {
+    public void releaseExchange(Exchange exchange, boolean autoRelease) {
         // noop
     }
 
diff --git 
a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java
 
b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java
index 005c085..4ab51f6 100644
--- 
a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java
+++ 
b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerEventsConsumer.java
@@ -88,24 +88,19 @@ public class DockerEventsConsumer extends DefaultConsumer {
         public void onNext(Event event) {
             LOG.debug("Received Docker Event: {}", event);
 
-            final Exchange exchange = getEndpoint().createExchange();
+            final Exchange exchange = createExchange(true);
             Message message = exchange.getIn();
             message.setBody(event);
 
-            try {
-                LOG.trace("Processing exchange [{}]...", exchange);
-                getAsyncProcessor().process(exchange, new AsyncCallback() {
-                    @Override
-                    public void done(boolean doneSync) {
-                        LOG.trace("Done processing exchange [{}]...", 
exchange);
+            LOG.trace("Processing exchange [{}]...", exchange);
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    if (exchange.getException() != null) {
+                        getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
                     }
-                });
-            } catch (Exception e) {
-                exchange.setException(e);
-            }
-            if (exchange.getException() != null) {
-                getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
-            }
+                }
+            });
         }
     }
 }
diff --git 
a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java
 
b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java
index 268a823..4160f37 100644
--- 
a/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java
+++ 
b/components/camel-docker/src/main/java/org/apache/camel/component/docker/consumer/DockerStatsConsumer.java
@@ -78,24 +78,19 @@ public class DockerStatsConsumer extends DefaultConsumer {
         public void onNext(Statistics statistics) {
             LOGGER.debug("Received Docker Statistics Event: {}", statistics);
 
-            final Exchange exchange = getEndpoint().createExchange();
+            final Exchange exchange = createExchange(true);
             Message message = exchange.getIn();
             message.setBody(statistics);
 
-            try {
-                LOGGER.trace("Processing exchange [{}]...", exchange);
-                getAsyncProcessor().process(exchange, new AsyncCallback() {
-                    @Override
-                    public void done(boolean doneSync) {
-                        LOGGER.trace("Done processing exchange [{}]...", 
exchange);
+            LOGGER.trace("Processing exchange [{}]...", exchange);
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    if (exchange.getException() != null) {
+                        getExceptionHandler().handleException("Error 
processing exchange", exchange, exchange.getException());
                     }
-                });
-            } catch (Exception e) {
-                exchange.setException(e);
-            }
-            if (exchange.getException() != null) {
-                getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
-            }
+                }
+            });
         }
     }
 }
diff --git 
a/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollConsumer.java
 
b/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollConsumer.java
index 5d80590..36f8252 100644
--- 
a/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollConsumer.java
+++ 
b/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollConsumer.java
@@ -40,8 +40,6 @@ public abstract class DropboxScheduledPollConsumer extends 
ScheduledPollConsumer
     /**
      * Lifecycle method invoked when the consumer has created. Internally 
create or reuse a connection to the low level
      * dropbox client
-     * 
-     * @throws Exception
      */
     @Override
     protected void doStart() throws Exception {
@@ -57,8 +55,6 @@ public abstract class DropboxScheduledPollConsumer extends 
ScheduledPollConsumer
 
     /**
      * Lifecycle method invoked when the consumer has destroyed. Erase the 
reference to the dropbox low level client
-     * 
-     * @throws Exception
      */
     @Override
     protected void doStop() throws Exception {
diff --git 
a/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollGetConsumer.java
 
b/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollGetConsumer.java
index 4097722..e624305 100644
--- 
a/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollGetConsumer.java
+++ 
b/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollGetConsumer.java
@@ -35,35 +35,33 @@ public class DropboxScheduledPollGetConsumer extends 
DropboxScheduledPollConsume
     /**
      * Poll from a dropbox remote path and put the result in the message 
exchange
      * 
-     * @return           number of messages polled
-     * @throws Exception
+     * @return number of messages polled
      */
     @Override
     protected int poll() throws Exception {
-        Exchange exchange = endpoint.createExchange();
-        DropboxFileDownloadResult result = new 
DropboxAPIFacade(configuration.getClient(), exchange)
-                .get(configuration.getRemotePath());
+        Exchange exchange = createExchange(false);
+        try {
+            DropboxFileDownloadResult result = new 
DropboxAPIFacade(configuration.getClient(), exchange)
+                    .get(configuration.getRemotePath());
 
-        Map<String, Object> map = result.getEntries();
-        if (map.size() == 1) {
-            for (Map.Entry<String, Object> entry : map.entrySet()) {
-                
exchange.getIn().setHeader(DropboxResultHeader.DOWNLOADED_FILE.name(), 
entry.getKey());
-                exchange.getIn().setBody(entry.getValue());
-            }
-        } else {
-            StringBuilder pathsExtracted = new StringBuilder();
-            for (Map.Entry<String, Object> entry : map.entrySet()) {
-                pathsExtracted.append(entry.getKey()).append("\n");
+            Map<String, Object> map = result.getEntries();
+            if (map.size() == 1) {
+                for (Map.Entry<String, Object> entry : map.entrySet()) {
+                    
exchange.getIn().setHeader(DropboxResultHeader.DOWNLOADED_FILE.name(), 
entry.getKey());
+                    exchange.getIn().setBody(entry.getValue());
+                }
+            } else {
+                StringBuilder pathsExtracted = new StringBuilder();
+                for (Map.Entry<String, Object> entry : map.entrySet()) {
+                    pathsExtracted.append(entry.getKey()).append("\n");
+                }
+                
exchange.getIn().setHeader(DropboxResultHeader.DOWNLOADED_FILES.name(), 
pathsExtracted.toString());
+                exchange.getIn().setBody(map);
             }
-            
exchange.getIn().setHeader(DropboxResultHeader.DOWNLOADED_FILES.name(), 
pathsExtracted.toString());
-            exchange.getIn().setBody(map);
-        }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Downloaded: {}", result);
-        }
-
-        try {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Downloaded: {}", result);
+            }
             // send message to next processor in the route
             getProcessor().process(exchange);
             return 1; // number of messages polled
@@ -72,6 +70,7 @@ public class DropboxScheduledPollGetConsumer extends 
DropboxScheduledPollConsume
             if (exchange.getException() != null) {
                 getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
             }
+            releaseExchange(exchange, false);
         }
     }
 }
diff --git 
a/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollSearchConsumer.java
 
b/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollSearchConsumer.java
index 39c39e5..82b8017 100644
--- 
a/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollSearchConsumer.java
+++ 
b/components/camel-dropbox/src/main/java/org/apache/camel/component/dropbox/integration/consumer/DropboxScheduledPollSearchConsumer.java
@@ -35,29 +35,27 @@ public class DropboxScheduledPollSearchConsumer extends 
DropboxScheduledPollCons
     /**
      * Poll from a dropbox remote path and put the result in the message 
exchange
      * 
-     * @return           number of messages polled
-     * @throws Exception
+     * @return number of messages polled
      */
     @Override
     protected int poll() throws Exception {
-        Exchange exchange = endpoint.createExchange();
-        DropboxSearchResult result = new 
DropboxAPIFacade(configuration.getClient(), exchange)
-                .search(configuration.getRemotePath(), 
configuration.getQuery());
+        Exchange exchange = createExchange(false);
+        try {
+            DropboxSearchResult result = new 
DropboxAPIFacade(configuration.getClient(), exchange)
+                    .search(configuration.getRemotePath(), 
configuration.getQuery());
 
-        StringBuilder fileExtracted = new StringBuilder();
-        for (SearchMatch entry : result.getFound()) {
-            
fileExtracted.append(entry.getMetadata().getName()).append("-").append(entry.getMetadata().getPathDisplay())
-                    .append("\n");
-        }
+            StringBuilder fileExtracted = new StringBuilder();
+            for (SearchMatch entry : result.getFound()) {
+                
fileExtracted.append(entry.getMetadata().getName()).append("-").append(entry.getMetadata().getPathDisplay())
+                        .append("\n");
+            }
 
-        exchange.getIn().setHeader(DropboxResultHeader.FOUND_FILES.name(), 
fileExtracted.toString());
-        exchange.getIn().setBody(result.getFound());
+            exchange.getIn().setHeader(DropboxResultHeader.FOUND_FILES.name(), 
fileExtracted.toString());
+            exchange.getIn().setBody(result.getFound());
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Downloaded: {}", result);
-        }
-
-        try {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Downloaded: {}", result);
+            }
             // send message to next processor in the route
             getProcessor().process(exchange);
             return 1; // number of messages polled
@@ -66,6 +64,7 @@ public class DropboxScheduledPollSearchConsumer extends 
DropboxScheduledPollCons
             if (exchange.getException() != null) {
                 getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
             }
+            releaseExchange(exchange, false);
         }
     }
 }

Reply via email to