This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 75a34636810791ed2407d2e6a687637e012aad4d Author: Claus Ibsen <[email protected]> AuthorDate: Wed Apr 7 14:34:09 2021 +0200 CAMEL-16462: camel-core - Optimize Multicast EIP to reduce object allocations. --- .../apache/camel/processor/MulticastProcessor.java | 30 +++++----------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index cd84349..0d2eaba 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -71,7 +71,6 @@ import org.apache.camel.support.PatternHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.CastUtils; import org.apache.camel.util.IOHelper; -import org.apache.camel.util.KeyValueHolder; import org.apache.camel.util.StopWatch; import org.apache.camel.util.concurrent.AsyncCompletionService; import org.slf4j.Logger; @@ -140,19 +139,6 @@ public class MulticastProcessor extends AsyncProcessorSupport } - /** - * Class that represents prepared fine grained error handlers when processing multicasted/splitted exchanges - * <p/> - * See the <tt>createProcessorExchangePair</tt> and <tt>createErrorHandler</tt> methods. - */ - static final class ErrorHandlerKey extends KeyValueHolder<Route, Processor> { - - ErrorHandlerKey(Route key, Processor value) { - super(key, value); - } - - } - private final class Scheduler implements Executor { @Override @@ -183,7 +169,7 @@ public class MulticastProcessor extends AsyncProcessorSupport private ExecutorService aggregateExecutorService; private boolean shutdownAggregateExecutorService; private final long timeout; - private final ConcurrentMap<ErrorHandlerKey, Processor> errorHandlers = new ConcurrentHashMap<>(); + private final ConcurrentMap<Processor, Processor> errorHandlers = new ConcurrentHashMap<>(); private final boolean shareUnitOfWork; public MulticastProcessor(CamelContext camelContext, Route route, Collection<Processor> processors) { @@ -1013,32 +999,30 @@ public class MulticastProcessor extends AsyncProcessorSupport protected Processor wrapInErrorHandler(Route route, Exchange exchange, Processor processor) { Processor answer; + Processor key = processor; if (route != this.route && this.route != null) { throw new UnsupportedOperationException("Is this really correct ?"); } - boolean tryBlock = exchange.getProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, boolean.class); + Boolean tryBlock = (Boolean) exchange.getProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK); // do not wrap in error handler if we are inside a try block - if (!tryBlock && route != null) { + if (route != null && (tryBlock == null || !tryBlock)) { // wrap the producer in error handler so we have fine grained error handling on // the output side instead of the input side // this is needed to support redelivery on that output alone and not doing redelivery // for the entire multicast block again which will start from scratch again - // create key for cache - final ErrorHandlerKey key = new ErrorHandlerKey(route, processor); - // lookup cached first to reuse and preserve memory answer = errorHandlers.get(key); if (answer != null) { - LOG.trace("Using existing error handler for: {}", processor); + LOG.trace("Using existing error handler for: {}", key); return answer; } - LOG.trace("Creating error handler for: {}", processor); + LOG.trace("Creating error handler for: {}", key); try { - processor = wrapInErrorHandler(route, processor); + processor = wrapInErrorHandler(route, key); // and wrap in unit of work processor so the copy exchange also can run under UoW answer = createUnitOfWorkProcessor(route, processor, exchange);
