This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit 7db81b5e9914338b16d6a696161bf39f8e3d3527 Author: Claus Ibsen <[email protected]> AuthorDate: Fri Feb 19 13:59:17 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../java/org/apache/camel/ExtendedExchange.java | 14 ++- .../java/org/apache/camel/spi/ExchangeFactory.java | 10 +- .../main/java/org/apache/camel/spi/UnitOfWork.java | 8 +- .../camel/impl/engine/CamelInternalProcessor.java | 10 +- .../camel/impl/engine/DefaultUnitOfWork.java | 104 +++++++++++---------- .../apache/camel/impl/engine/MDCUnitOfWork.java | 7 -- .../camel/impl/engine/PooledExchangeFactory.java | 49 ++++++---- .../camel/impl/engine/SimpleCamelContext.java | 2 +- .../org/apache/camel/support/DefaultConsumer.java | 1 - .../org/apache/camel/support/DefaultExchange.java | 94 +++++++++++-------- .../org/apache/camel/support/UnitOfWorkHelper.java | 12 +-- 11 files changed, 168 insertions(+), 143 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java index 9232ceb..36fb8b9 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java @@ -18,6 +18,7 @@ package org.apache.camel; import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.UnitOfWork; @@ -29,16 +30,21 @@ import org.apache.camel.spi.UnitOfWork; public interface ExtendedExchange extends Exchange { /** - * Clears the exchange from user data so it may be reused. + * Registers a task to run when this exchange is done. + */ + void onDone(Function<Exchange, Boolean> task); + + /** + * When the exchange is done being used. * <p/> * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. */ - void reset(); + void done(); /** - * Sets the created timestamp + * Resets the exchange for reuse with the given created timestamp; */ - void setCreated(long created); + void reset(long created); /** * Sets the endpoint which originated this message exchange. This method should typically only be called by diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java index 592f431..34ef0cf 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java @@ -67,7 +67,13 @@ public interface ExchangeFactory { */ Exchange create(Endpoint fromEndpoint, boolean autoRelease); - default void release(Exchange exchange) { - // noop + /** + * Releases the exchange back into the pool + * + * @param exchange the exchange + * @return true if released into the pool, or false if something went wrong and the exchange was discarded + */ + default boolean release(Exchange exchange) { + return true; } } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java index e94840a..a6f118e 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java @@ -23,13 +23,12 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Route; -import org.apache.camel.Service; /** * An object representing the unit of work processing an {@link Exchange} which allows the use of * {@link Synchronization} hooks. This object might map one-to-one with a transaction in JPA or Spring; or might not. */ -public interface UnitOfWork extends Service { +public interface UnitOfWork { String MDC_BREADCRUMB_ID = "camel.breadcrumbId"; String MDC_EXCHANGE_ID = "camel.exchangeId"; @@ -50,9 +49,10 @@ public interface UnitOfWork extends Service { /** * Prepares this unit of work with the given input {@link Exchange} * - * @param exchange the exchange + * @param exchange the exchange + * @return true if the unit of work was created and prepared, false if already prepared */ - void onExchange(Exchange exchange); + boolean onPrepare(Exchange exchange); /** * Adds a synchronization hook diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 255b894..9696d1b 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -658,11 +658,17 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In created = createUnitOfWork(exchange); ExtendedExchange ee = (ExtendedExchange) exchange; ee.setUnitOfWork(created); - created.start(); uow = created; } else { // reuse existing exchange - uow.onExchange(exchange); + if (uow.onPrepare(exchange)) { + // need to re-attach uow + ExtendedExchange ee = (ExtendedExchange) exchange; + ee.setUnitOfWork(uow); + // we are prepared for reuse and can regard it as-if we created the unit of work + // so the after method knows that this is the outer bounds and should done the unit of work + created = uow; + } } // for any exchange we should push/pop route context so we can keep track of which route we are routing diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index 7e51696..d501526 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -32,7 +32,6 @@ import org.apache.camel.ExtendedExchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Route; -import org.apache.camel.Service; import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.SynchronizationVetoable; @@ -47,7 +46,7 @@ import org.slf4j.LoggerFactory; /** * The default implementation of {@link org.apache.camel.spi.UnitOfWork} */ -public class DefaultUnitOfWork implements UnitOfWork, Service { +public class DefaultUnitOfWork implements UnitOfWork { private static final Logger LOG = LoggerFactory.getLogger(DefaultUnitOfWork.class); // instances used by MDCUnitOfWork @@ -81,7 +80,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { this.useBreadcrumb = useBreadcrumb; this.context = (ExtendedCamelContext) exchange.getContext(); this.inflightRepository = inflightRepository; - onExchange(exchange); + doOnPrepare(exchange); } UnitOfWork newInstance(Exchange exchange) { @@ -89,50 +88,57 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { } @Override - public void onExchange(Exchange exchange) { + public boolean onPrepare(Exchange exchange) { if (this.exchange == null) { - // unit of work is reused, so setup for this exchange - this.exchange = exchange; - - if (allowUseOriginalMessage) { - // special for JmsMessage as it can cause it to loose headers later. - if (exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage")) { - this.originalInMessage = new DefaultMessage(context); - this.originalInMessage.setBody(exchange.getIn().getBody()); - this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders()); - } else { - this.originalInMessage = exchange.getIn().copy(); - } - // must preserve exchange on the original in message - if (this.originalInMessage instanceof MessageSupport) { - ((MessageSupport) this.originalInMessage).setExchange(exchange); - } - } + doOnPrepare(exchange); + return true; + } else { + return false; + } + } - // inject breadcrumb header if enabled - if (useBreadcrumb) { - // create or use existing breadcrumb - String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); - if (breadcrumbId == null) { - // no existing breadcrumb, so create a new one based on the exchange id - breadcrumbId = exchange.getExchangeId(); - exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId); - } + private void doOnPrepare(Exchange exchange) { + // unit of work is reused, so setup for this exchange + this.exchange = exchange; + + if (allowUseOriginalMessage) { + // special for JmsMessage as it can cause it to loose headers later. + if (exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage")) { + this.originalInMessage = new DefaultMessage(context); + this.originalInMessage.setBody(exchange.getIn().getBody()); + this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders()); + } else { + this.originalInMessage = exchange.getIn().copy(); + } + // must preserve exchange on the original in message + if (this.originalInMessage instanceof MessageSupport) { + ((MessageSupport) this.originalInMessage).setExchange(exchange); } + } - // fire event - if (context.isEventNotificationApplicable()) { - try { - EventHelper.notifyExchangeCreated(context, exchange); - } catch (Throwable e) { - // must catch exceptions to ensure the exchange is not failing due to notification event failed - log.warn("Exception occurred during event notification. This exception will be ignored.", e); - } + // inject breadcrumb header if enabled + if (useBreadcrumb) { + // create or use existing breadcrumb + String breadcrumbId = exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class); + if (breadcrumbId == null) { + // no existing breadcrumb, so create a new one based on the exchange id + breadcrumbId = exchange.getExchangeId(); + exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, breadcrumbId); } + } - // register to inflight registry - inflightRepository.add(exchange); + // fire event + if (context.isEventNotificationApplicable()) { + try { + EventHelper.notifyExchangeCreated(context, exchange); + } catch (Throwable e) { + // must catch exceptions to ensure the exchange is not failing due to notification event failed + log.warn("Exception occurred during event notification. This exception will be ignored.", e); + } } + + // register to inflight registry + inflightRepository.add(exchange); } @Override @@ -161,16 +167,6 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { } @Override - public void start() { - // noop - } - - @Override - public void stop() { - // noop - } - - @Override public synchronized void addSynchronization(Synchronization synchronization) { if (synchronizations == null) { synchronizations = new ArrayList<>(8); @@ -250,6 +246,14 @@ public class DefaultUnitOfWork implements UnitOfWork, Service { log.warn("Exception occurred during event notification. This exception will be ignored.", e); } } + + // the exchange is now done + try { + exchange.adapt(ExtendedExchange.class).done(); + } catch (Throwable e) { + // must catch exceptions to ensure synchronizations is also invoked + log.warn("Exception occurred during exchange done. This exception will be ignored.", e); + } } @Override diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java index d111302..64d34f6 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java @@ -87,13 +87,6 @@ public class MDCUnitOfWork extends DefaultUnitOfWork { } @Override - public void stop() { - super.stop(); - // and remove when stopping - clear(); - } - - @Override public void pushRoute(Route route) { super.pushRoute(route); if (route != null) { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java index e1d45fa..6f44fb1 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java @@ -19,7 +19,15 @@ package org.apache.camel.impl.engine; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; -import org.apache.camel.*; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Consumer; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Experimental; +import org.apache.camel.ExtendedExchange; +import org.apache.camel.NonManagedService; +import org.apache.camel.StaticService; import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.support.DefaultExchange; import org.apache.camel.support.SynchronizationAdapter; @@ -89,18 +97,19 @@ public class PooledExchangeFactory extends ServiceSupport created.incrementAndGet(); } // create a new exchange as there was no free from the pool - exchange = new DefaultExchange(camelContext); + ExtendedExchange answer = new DefaultExchange(camelContext); + if (autoRelease) { + // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created + answer.onDone(this::release); + } + return answer; } else { if (statisticsEnabled) { acquired.incrementAndGet(); } - // the exchange is reused but update the created to now + // reset exchange for reuse ExtendedExchange ee = exchange.adapt(ExtendedExchange.class); - ee.setCreated(System.currentTimeMillis()); - } - if (autoRelease) { - // add on completion which will return the exchange when done - exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); + ee.reset(System.currentTimeMillis()); } return exchange; } @@ -113,41 +122,41 @@ public class PooledExchangeFactory extends ServiceSupport created.incrementAndGet(); } // create a new exchange as there was no free from the pool - exchange = new DefaultExchange(fromEndpoint); + ExtendedExchange answer = new DefaultExchange(fromEndpoint); + if (autoRelease) { + // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created + answer.onDone(this::release); + } + return answer; } else { if (statisticsEnabled) { acquired.incrementAndGet(); } - // the exchange is reused but update the created to now + // reset exchange for reuse ExtendedExchange ee = exchange.adapt(ExtendedExchange.class); - ee.setCreated(System.currentTimeMillis()); - // need to mark this exchange from the given endpoint - ee.setFromEndpoint(fromEndpoint); - } - if (autoRelease) { - // add on completion which will return the exchange when done - exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); + ee.reset(System.currentTimeMillis()); } return exchange; } @Override - public void release(Exchange exchange) { + public boolean release(Exchange exchange) { // reset exchange before returning to pool try { ExtendedExchange ee = exchange.adapt(ExtendedExchange.class); - ee.reset(); + ee.done(); // only release back in pool if reset was success if (statisticsEnabled) { released.incrementAndGet(); } - pool.offer(exchange); + return pool.offer(exchange); } catch (Exception e) { if (statisticsEnabled) { discarded.incrementAndGet(); } LOG.debug("Error resetting exchange: {}. This exchange is discarded.", exchange); + return false; } } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java index e373e43..a9d1e87 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java @@ -552,7 +552,7 @@ public class SimpleCamelContext extends AbstractCamelContext { ExchangeFactory.class); // TODO: experiment - // return result.orElseGet(DefaultExchangeFactory::new); + // return result.orElseGet(DefaultExchangeFactory::new); return result.orElseGet(PooledExchangeFactory::new); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java index ee12829..83c568c 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java @@ -109,7 +109,6 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw UnitOfWork uow = endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory() .createUnitOfWork(exchange); exchange.adapt(ExtendedExchange.class).setUnitOfWork(uow); - uow.start(); return uow; } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java index 808bfb6..1ba720a 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Function; import org.apache.camel.CamelContext; import org.apache.camel.CamelExecutionException; @@ -45,6 +46,7 @@ import org.apache.camel.util.ObjectHelper; public final class DefaultExchange implements ExtendedExchange { private final CamelContext context; + private Function<Exchange, Boolean> onDone; private long created; // optimize to create properties always and with a reasonable small size private final Map<String, Object> properties = new ConcurrentHashMap<>(8); @@ -121,49 +123,62 @@ public final class DefaultExchange implements ExtendedExchange { } } - public void reset() { - this.properties.clear(); - this.exchangeId = null; - this.created = 0; - // TODO: optimize in/out to keep as default message (if original message is this kind) - this.in = null; - this.out = null; - this.exception = null; - // reset uow - if (this.unitOfWork != null) { - this.unitOfWork.reset(); - } - // reset pattern to original - this.pattern = originalPattern; - if (this.onCompletions != null) { - this.onCompletions.clear(); - } - // do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again - this.externalRedelivered = null; - this.historyNodeId = null; - this.historyNodeLabel = null; - this.transacted = false; - this.routeStop = false; - this.rollbackOnly = false; - this.rollbackOnlyLast = false; - this.notifyEvent = false; - this.interrupted = false; - this.interruptable = true; - this.redeliveryExhausted = false; - this.errorHandlerHandled = null; + @Override + public void onDone(Function<Exchange, Boolean> task) { + this.onDone = task; } - @Override - public long getCreated() { - return created; + public void done() { + // only need to do this if there is an onDone task + // and use created flag to avoid doing done more than once + if (created > 0) { + this.created = 0; // by setting to 0 we also flag that this exchange is done and needs to be reset to use again + this.properties.clear(); + this.exchangeId = null; + // TODO: optimize in/out to keep as default message (if original message is this kind) + this.in = null; + this.out = null; + this.exception = null; + // reset uow + if (this.unitOfWork != null) { + this.unitOfWork.reset(); + } + // reset pattern to original + this.pattern = originalPattern; + if (this.onCompletions != null) { + this.onCompletions.clear(); + } + // do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again + this.externalRedelivered = null; + this.historyNodeId = null; + this.historyNodeLabel = null; + this.transacted = false; + this.routeStop = false; + this.rollbackOnly = false; + this.rollbackOnlyLast = false; + this.notifyEvent = false; + this.interrupted = false; + this.interruptable = true; + this.redeliveryExhausted = false; + this.errorHandlerHandled = null; + + if (onDone != null) { + onDone.apply(this); + } + } } @Override - public void setCreated(long created) { + public void reset(long created) { this.created = created; } @Override + public long getCreated() { + return created; + } + + @Override public Exchange copy() { DefaultExchange exchange = new DefaultExchange(this); @@ -603,7 +618,7 @@ public final class DefaultExchange implements ExtendedExchange { @Override public void setUnitOfWork(UnitOfWork unitOfWork) { this.unitOfWork = unitOfWork; - if (unitOfWork != null && onCompletions != null) { + if (unitOfWork != null && onCompletions != null && !onCompletions.isEmpty()) { // now an unit of work has been assigned so add the on completions // we might have registered already for (Synchronization onCompletion : onCompletions) { @@ -612,7 +627,6 @@ public final class DefaultExchange implements ExtendedExchange { // cleanup the temporary on completion list as they now have been registered // on the unit of work onCompletions.clear(); - onCompletions = null; } } @@ -626,7 +640,7 @@ public final class DefaultExchange implements ExtendedExchange { } onCompletions.add(onCompletion); } else { - getUnitOfWork().addSynchronization(onCompletion); + unitOfWork.addSynchronization(onCompletion); } } @@ -643,13 +657,12 @@ public final class DefaultExchange implements ExtendedExchange { @Override public void handoverCompletions(Exchange target) { - if (onCompletions != null) { + if (onCompletions != null && !onCompletions.isEmpty()) { for (Synchronization onCompletion : onCompletions) { target.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); } // cleanup the temporary on completion list as they have been handed over onCompletions.clear(); - onCompletions = null; } else if (unitOfWork != null) { // let unit of work handover unitOfWork.handoverSynchronization(target); @@ -659,10 +672,9 @@ public final class DefaultExchange implements ExtendedExchange { @Override public List<Synchronization> handoverCompletions() { List<Synchronization> answer = null; - if (onCompletions != null) { + if (onCompletions != null && !onCompletions.isEmpty()) { answer = new ArrayList<>(onCompletions); onCompletions.clear(); - onCompletions = null; } return answer; } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java index fd2194e..196c6ae 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java @@ -21,7 +21,6 @@ import java.util.Collections; import java.util.List; import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Route; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.SynchronizationRouteAware; @@ -56,20 +55,11 @@ public final class UnitOfWorkHelper { LOG.warn("Exception occurred during done UnitOfWork for Exchange: {}. This exception will be ignored.", exchange, e); } - // stop - try { - uow.stop(); - } catch (Throwable e) { - LOG.warn("Exception occurred during stopping UnitOfWork for Exchange: {}. This exception will be ignored.", - exchange, e); - } - // MUST clear and set uow to null on exchange after done - ExtendedExchange ee = (ExtendedExchange) exchange; - ee.setUnitOfWork(null); } public static void doneSynchronizations(Exchange exchange, List<Synchronization> synchronizations, Logger log) { if (synchronizations != null && !synchronizations.isEmpty()) { + // TODO: only copy/sort if there is > 1 (if 1 then use directly (no for loop) // work on a copy of the list to avoid any modification which may cause ConcurrentModificationException List<Synchronization> copy = new ArrayList<>(synchronizations);
