This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit f6d0603c6098bc210ef13ff1befc408b04eb40e6 Author: Claus Ibsen <[email protected]> AuthorDate: Sat Feb 20 17:07:51 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../java/org/apache/camel/spi/ExchangeFactory.java | 11 ++++ .../camel/impl/engine/DefaultExchangeFactory.java | 18 ++++++- .../camel/impl/engine/PooledExchangeFactory.java | 63 ++++++++++------------ 3 files changed, 55 insertions(+), 37 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java index 9bc469f..197d473 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java @@ -72,4 +72,15 @@ public interface ExchangeFactory { default boolean release(Exchange exchange) { return true; } + + /** + * Whether statistics is enabled. + */ + boolean isStatisticsEnabled(); + + /** + * Whether statistics is enabled. + */ + void setStatisticsEnabled(boolean statisticsEnabled); + } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java index a8db865..469fb7c 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java @@ -16,14 +16,18 @@ */ package org.apache.camel.impl.engine; -import org.apache.camel.*; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Consumer; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.support.DefaultExchange; /** * Default {@link ExchangeFactory} that creates a new {@link Exchange} instance. */ -public class DefaultExchangeFactory implements ExchangeFactory, CamelContextAware { +public final class DefaultExchangeFactory implements ExchangeFactory, CamelContextAware { private CamelContext camelContext; @@ -53,4 +57,14 @@ public class DefaultExchangeFactory implements ExchangeFactory, CamelContextAwar return new DefaultExchange(fromEndpoint); } + @Override + public boolean isStatisticsEnabled() { + return false; + } + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + // not in use + } + } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java index c5b51b4..4b228eb0 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java @@ -19,10 +19,17 @@ package org.apache.camel.impl.engine; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; -import org.apache.camel.*; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Consumer; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Experimental; +import org.apache.camel.NonManagedService; +import org.apache.camel.PooledExchange; +import org.apache.camel.StaticService; import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.support.DefaultPooledExchange; -import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.URISupport; import org.slf4j.Logger; @@ -32,7 +39,7 @@ import org.slf4j.LoggerFactory; * Pooled {@link ExchangeFactory} that reuses {@link Exchange} instance from a pool. */ @Experimental -public class PooledExchangeFactory extends ServiceSupport +public final class PooledExchangeFactory extends ServiceSupport implements ExchangeFactory, CamelContextAware, StaticService, NonManagedService { // TODO: optimize onDone lambdas as they will be created per instance, and we can use static linked @@ -40,7 +47,6 @@ public class PooledExchangeFactory extends ServiceSupport private static final Logger LOG = LoggerFactory.getLogger(PooledExchangeFactory.class); private final Consumer consumer; - private final ReleaseOnCompletion onCompletion = new ReleaseOnCompletion(); private final ConcurrentLinkedQueue<Exchange> pool = new ConcurrentLinkedQueue<>(); private final AtomicLong acquired = new AtomicLong(); private final AtomicLong created = new AtomicLong(); @@ -48,7 +54,7 @@ public class PooledExchangeFactory extends ServiceSupport private final AtomicLong discarded = new AtomicLong(); private CamelContext camelContext; - private boolean statisticsEnabled = true; + private boolean statisticsEnabled; public PooledExchangeFactory() { this.consumer = null; @@ -91,13 +97,7 @@ public class PooledExchangeFactory extends ServiceSupport created.incrementAndGet(); } // create a new exchange as there was no free from the pool - PooledExchange answer = new DefaultPooledExchange(camelContext); - answer.setAutoRelease(autoRelease); - if (autoRelease) { - // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created - answer.onDone(this::release); - } - return answer; + exchange = createPooledExchange(null, autoRelease); } else { if (statisticsEnabled) { acquired.incrementAndGet(); @@ -117,13 +117,7 @@ public class PooledExchangeFactory extends ServiceSupport created.incrementAndGet(); } // create a new exchange as there was no free from the pool - PooledExchange answer = new DefaultPooledExchange(fromEndpoint); - answer.setAutoRelease(autoRelease); - if (autoRelease) { - // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created - answer.onDone(this::release); - } - return answer; + exchange = new DefaultPooledExchange(fromEndpoint); } else { if (statisticsEnabled) { acquired.incrementAndGet(); @@ -158,6 +152,21 @@ public class PooledExchangeFactory extends ServiceSupport } } + protected PooledExchange createPooledExchange(Endpoint fromEndpoint, boolean autoRelease) { + PooledExchange answer = null; + if (fromEndpoint != null) { + answer = new DefaultPooledExchange(fromEndpoint); + } else { + answer = new DefaultPooledExchange(camelContext); + } + answer.setAutoRelease(autoRelease); + if (autoRelease) { + // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created + answer.onDone(this::release); + } + return answer; + } + @Override protected void doStop() throws Exception { pool.clear(); @@ -180,20 +189,4 @@ public class PooledExchangeFactory extends ServiceSupport discarded.set(0); } - private final class ReleaseOnCompletion extends SynchronizationAdapter { - - @Override - public int getOrder() { - // should be very very last so set as highest value possible - return Integer.MAX_VALUE; - } - - @Override - public void onDone(Exchange exchange) { - if (exchange != null) { - release(exchange); - } - } - } - }
