This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.22.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4477bbac44daed14f0096d23b43c06c8c609cd45 Author: Claus Ibsen <[email protected]> AuthorDate: Mon Jun 17 12:17:41 2024 +0200 CAMEL-20835: camel-core - Multicast/RecipientList use LRUCache with configurable cacheSize for internal errorHandler cache to avoid OOM. Thanks to Arthur Naseef for reproducer. (#14553) --- .../camel/processor/DefaultProcessorFactory.java | 2 +- .../apache/camel/processor/MulticastProcessor.java | 43 ++++++++++++++-------- .../org/apache/camel/processor/RecipientList.java | 4 +- .../camel/processor/RecipientListProcessor.java | 5 ++- .../java/org/apache/camel/processor/Splitter.java | 6 +-- .../org/apache/camel/reifier/MulticastReifier.java | 2 +- 6 files changed, 36 insertions(+), 26 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java index 1ededba6bdd..36bad7b7ad5 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java @@ -118,7 +118,7 @@ public class DefaultProcessorFactory implements ProcessorFactory, BootstrapClose boolean shutdownExecutorService = (boolean) args[2]; return new MulticastProcessor( camelContext, null, processors, null, true, executor, shutdownExecutorService, false, false, 0, - null, false, false); + null, false, false, 0); } else if ("Pipeline".equals(definitionName)) { List<Processor> processors = (List<Processor>) args[0]; return Pipeline.newInstance(camelContext, processors); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index b11235a0e59..2e45254b9e7 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -64,9 +64,11 @@ import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; +import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.DefaultExchange; import org.apache.camel.support.EventHelper; import org.apache.camel.support.ExchangeHelper; +import org.apache.camel.support.LRUCacheFactory; import org.apache.camel.support.PatternHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.CastUtils; @@ -170,7 +172,8 @@ public class MulticastProcessor extends AsyncProcessorSupport private ExecutorService aggregateExecutorService; private boolean shutdownAggregateExecutorService; private final long timeout; - private final ConcurrentMap<Processor, Processor> errorHandlers = new ConcurrentHashMap<>(); + private final int cacheSize; + private final ConcurrentMap<Processor, Processor> errorHandlers; private final boolean shareUnitOfWork; public MulticastProcessor(CamelContext camelContext, Route route, Collection<Processor> processors) { @@ -179,7 +182,9 @@ public class MulticastProcessor extends AsyncProcessorSupport public MulticastProcessor(CamelContext camelContext, Route route, Collection<Processor> processors, AggregationStrategy aggregationStrategy) { - this(camelContext, route, processors, aggregationStrategy, false, null, false, false, false, 0, null, false, false); + this(camelContext, route, processors, aggregationStrategy, false, null, + false, false, false, 0, null, + false, false, CamelContextHelper.getMaximumCachePoolSize(camelContext)); } public MulticastProcessor(CamelContext camelContext, Route route, Collection<Processor> processors, @@ -187,7 +192,7 @@ public class MulticastProcessor extends AsyncProcessorSupport boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, - boolean parallelAggregate) { + boolean parallelAggregate, int cacheSize) { notNull(camelContext, "camelContext"); this.camelContext = camelContext; this.internalProcessorFactory = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory(); @@ -208,6 +213,13 @@ public class MulticastProcessor extends AsyncProcessorSupport this.parallelAggregate = parallelAggregate; this.processorExchangeFactory = camelContext.adapt(ExtendedCamelContext.class) .getProcessorExchangeFactory().newProcessorExchangeFactory(this); + this.cacheSize = cacheSize; + if (cacheSize >= 0) { + this.errorHandlers = (ConcurrentMap) LRUCacheFactory.newLRUCache(cacheSize); + } else { + // no cache + this.errorHandlers = null; + } } @Override @@ -1022,6 +1034,7 @@ public class MulticastProcessor extends AsyncProcessorSupport return new DefaultProcessorExchangePair(index, processor, prepared, exchange); } + @SuppressWarnings("unchecked") protected Processor wrapInErrorHandler(Route route, Exchange exchange, Processor processor) { Processor answer; Processor key = processor; @@ -1039,7 +1052,7 @@ public class MulticastProcessor extends AsyncProcessorSupport // for the entire multicast block again which will start from scratch again // lookup cached first to reuse and preserve memory - answer = errorHandlers.get(key); + answer = errorHandlers != null ? errorHandlers.get(key) : null; if (answer != null) { LOG.trace("Using existing error handler for: {}", key); return answer; @@ -1058,16 +1071,7 @@ public class MulticastProcessor extends AsyncProcessorSupport ServiceHelper.startService(answer); // here we don't cache the child unit of work - if (!child) { - // add to cache - // TODO returned value ignored intentionally? - // Findbugs alert: - // The putIfAbsent method is typically used to ensure that a single value - // is associated with a given key (the first value for which put if absent succeeds). - // If you ignore the return value and retain a reference to the value passed in, - // you run the risk of retaining a value that is not the one that is associated - // with the key in the map. If it matters which one you use and you use the one - // that isn't stored in the map, your program will behave incorrectly. + if (!child && errorHandlers != null) { errorHandlers.putIfAbsent(key, answer); } @@ -1159,7 +1163,9 @@ public class MulticastProcessor extends AsyncProcessorSupport protected void doShutdown() throws Exception { ServiceHelper.stopAndShutdownServices(processors, errorHandlers, aggregationStrategy, processorExchangeFactory); // only clear error handlers when shutting down - errorHandlers.clear(); + if (errorHandlers != null) { + errorHandlers.clear(); + } if (shutdownExecutorService && executorService != null) { getCamelContext().getExecutorServiceManager().shutdownNow(executorService); @@ -1266,6 +1272,13 @@ public class MulticastProcessor extends AsyncProcessorSupport return timeout; } + /** + * Maximum cache size used for reusing processors + */ + public int getCacheSize() { + return cacheSize; + } + /** * Use {@link #getAggregationStrategy(org.apache.camel.Exchange)} instead. */ diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java index dbccb6cacf7..beb0720be53 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java @@ -204,8 +204,8 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou recipientListProcessor = new RecipientListProcessor( camelContext, null, expression, delimiter, producerCache, getAggregationStrategy(), - isParallelProcessing(), getExecutorService(), isShutdownExecutorService(), - isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate()); + isParallelProcessing(), getExecutorService(), isShutdownExecutorService(), isStreaming(), + isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(), getCacheSize()); recipientListProcessor.setSynchronous(synchronous); recipientListProcessor.setErrorHandler(errorHandler); recipientListProcessor.setAggregateExecutorService(aggregateExecutorService); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java index 8c1a735a86d..b5b13ca6322 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java @@ -164,10 +164,11 @@ public class RecipientListProcessor extends MulticastProcessor { AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, - long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate) { + long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate, + int cacheSize) { super(camelContext, route, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, - shareUnitOfWork, parallelAggregate); + shareUnitOfWork, parallelAggregate, cacheSize); this.expression = expression; this.delimiter = delimiter; this.producerCache = producerCache; diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java index 597bc007391..a27d50ad283 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java @@ -46,8 +46,6 @@ import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.ObjectHelper; import org.apache.camel.util.IOHelper; import org.apache.camel.util.StringHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.camel.util.ObjectHelper.notNull; @@ -57,8 +55,6 @@ import static org.apache.camel.util.ObjectHelper.notNull; */ public class Splitter extends MulticastProcessor implements AsyncProcessor, Traceable { - private static final Logger LOG = LoggerFactory.getLogger(Splitter.class); - private static final String IGNORE_DELIMITER_MARKER = "false"; private final Expression expression; private final String delimiter; @@ -80,7 +76,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac boolean useSubUnitOfWork, boolean parallelAggregate, String delimiter) { super(camelContext, route, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, - timeout, onPrepare, useSubUnitOfWork, parallelAggregate); + timeout, onPrepare, useSubUnitOfWork, parallelAggregate, 0); this.expression = expression; StringHelper.notEmpty(delimiter, "delimiter"); this.delimiter = delimiter; diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java index 86d2de72f2a..b9b6fa79129 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java @@ -78,7 +78,7 @@ public class MulticastReifier extends ProcessorReifier<MulticastDefinition> { MulticastProcessor answer = new MulticastProcessor( camelContext, route, list, strategy, isParallelProcessing, threadPool, shutdownThreadPool, isStreaming, - isStopOnException, timeout, prepare, isShareUnitOfWork, isParallelAggregate); + isStopOnException, timeout, prepare, isShareUnitOfWork, isParallelAggregate, 0); answer.setSynchronous(isSynchronous); return answer; }
