This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.0.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.0.x by this push:
new a6493850008 CAMEL-20835: camel-core - Multicast/RecipientList use
LRUCache with configurable cacheSize for internal errorHandler cache to avoid
OOM. Thanks to Arthur Naseef for reproducer. (#14553)
a6493850008 is described below
commit a6493850008493b781e9431aea2e7ca2bfcfa51f
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Jun 17 12:17:41 2024 +0200
CAMEL-20835: camel-core - Multicast/RecipientList use LRUCache with
configurable cacheSize for internal errorHandler cache to avoid OOM. Thanks to
Arthur Naseef for reproducer. (#14553)
---
.../camel/processor/DefaultProcessorFactory.java | 2 +-
.../apache/camel/processor/MulticastProcessor.java | 43 ++++++++++++++--------
.../org/apache/camel/processor/RecipientList.java | 4 +-
.../camel/processor/RecipientListProcessor.java | 5 ++-
.../java/org/apache/camel/processor/Splitter.java | 6 +--
.../org/apache/camel/reifier/MulticastReifier.java | 2 +-
6 files changed, 36 insertions(+), 26 deletions(-)
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java
index 3e2eabffeb5..e97eaa5f34e 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java
@@ -118,7 +118,7 @@ public class DefaultProcessorFactory implements
ProcessorFactory, BootstrapClose
boolean shutdownExecutorService = (boolean) args[2];
return new MulticastProcessor(
camelContext, null, processors, null, true, executor,
shutdownExecutorService, false, false, 0,
- null, false, false);
+ null, false, false, 0);
} else if ("Pipeline".equals(definitionName)) {
List<Processor> processors = (List<Processor>) args[0];
return Pipeline.newInstance(camelContext, processors);
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 9e5a486ca94..5a6dca72ea9 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -62,9 +62,11 @@ import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.LRUCacheFactory;
import org.apache.camel.support.PatternHelper;
import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.service.ServiceHelper;
@@ -169,7 +171,8 @@ public class MulticastProcessor extends
AsyncProcessorSupport
private ExecutorService aggregateExecutorService;
private boolean shutdownAggregateExecutorService;
private final long timeout;
- private final ConcurrentMap<Processor, Processor> errorHandlers = new
ConcurrentHashMap<>();
+ private final int cacheSize;
+ private final ConcurrentMap<Processor, Processor> errorHandlers;
private final boolean shareUnitOfWork;
public MulticastProcessor(CamelContext camelContext, Route route,
Collection<Processor> processors) {
@@ -178,7 +181,9 @@ public class MulticastProcessor extends
AsyncProcessorSupport
public MulticastProcessor(CamelContext camelContext, Route route,
Collection<Processor> processors,
AggregationStrategy aggregationStrategy) {
- this(camelContext, route, processors, aggregationStrategy, false,
null, false, false, false, 0, null, false, false);
+ this(camelContext, route, processors, aggregationStrategy, false, null,
+ false, false, false, 0, null,
+ false, false,
CamelContextHelper.getMaximumCachePoolSize(camelContext));
}
public MulticastProcessor(CamelContext camelContext, Route route,
Collection<Processor> processors,
@@ -186,7 +191,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport
boolean parallelProcessing, ExecutorService
executorService, boolean shutdownExecutorService,
boolean streaming,
boolean stopOnException, long timeout, Processor
onPrepare, boolean shareUnitOfWork,
- boolean parallelAggregate) {
+ boolean parallelAggregate, int cacheSize) {
notNull(camelContext, "camelContext");
this.camelContext = camelContext;
this.internalProcessorFactory =
PluginHelper.getInternalProcessorFactory(camelContext);
@@ -207,6 +212,13 @@ public class MulticastProcessor extends
AsyncProcessorSupport
this.parallelAggregate = parallelAggregate;
this.processorExchangeFactory = camelContext.getCamelContextExtension()
.getProcessorExchangeFactory().newProcessorExchangeFactory(this);
+ this.cacheSize = cacheSize;
+ if (cacheSize >= 0) {
+ this.errorHandlers = (ConcurrentMap)
LRUCacheFactory.newLRUCache(cacheSize);
+ } else {
+ // no cache
+ this.errorHandlers = null;
+ }
}
@Override
@@ -1021,6 +1033,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport
return new DefaultProcessorExchangePair(index, processor, prepared,
exchange);
}
+ @SuppressWarnings("unchecked")
protected Processor wrapInErrorHandler(Route route, Exchange exchange,
Processor processor) {
Processor answer;
Processor key = processor;
@@ -1038,7 +1051,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport
// for the entire multicast block again which will start from
scratch again
// lookup cached first to reuse and preserve memory
- answer = errorHandlers.get(key);
+ answer = errorHandlers != null ? errorHandlers.get(key) : null;
if (answer != null) {
LOG.trace("Using existing error handler for: {}", key);
return answer;
@@ -1057,16 +1070,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport
ServiceHelper.startService(answer);
// here we don't cache the child unit of work
- if (!child) {
- // add to cache
- // TODO returned value ignored intentionally?
- // Findbugs alert:
- // The putIfAbsent method is typically used to ensure that
a single value
- // is associated with a given key (the first value for
which put if absent succeeds).
- // If you ignore the return value and retain a reference
to the value passed in,
- // you run the risk of retaining a value that is not the
one that is associated
- // with the key in the map. If it matters which one you
use and you use the one
- // that isn't stored in the map, your program will behave
incorrectly.
+ if (!child && errorHandlers != null) {
errorHandlers.putIfAbsent(key, answer);
}
@@ -1158,7 +1162,9 @@ public class MulticastProcessor extends
AsyncProcessorSupport
protected void doShutdown() throws Exception {
ServiceHelper.stopAndShutdownServices(processors, errorHandlers,
aggregationStrategy, processorExchangeFactory);
// only clear error handlers when shutting down
- errorHandlers.clear();
+ if (errorHandlers != null) {
+ errorHandlers.clear();
+ }
if (shutdownExecutorService && executorService != null) {
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
@@ -1265,6 +1271,13 @@ public class MulticastProcessor extends
AsyncProcessorSupport
return timeout;
}
+ /**
+ * Maximum cache size used for reusing processors
+ */
+ public int getCacheSize() {
+ return cacheSize;
+ }
+
/**
* Use {@link #getAggregationStrategy(org.apache.camel.Exchange)} instead.
*/
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
index dbccb6cacf7..beb0720be53 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -204,8 +204,8 @@ public class RecipientList extends AsyncProcessorSupport
implements IdAware, Rou
recipientListProcessor = new RecipientListProcessor(
camelContext, null, expression, delimiter, producerCache,
getAggregationStrategy(),
- isParallelProcessing(), getExecutorService(),
isShutdownExecutorService(),
- isStreaming(), isStopOnException(), getTimeout(),
getOnPrepare(), isShareUnitOfWork(), isParallelAggregate());
+ isParallelProcessing(), getExecutorService(),
isShutdownExecutorService(), isStreaming(),
+ isStopOnException(), getTimeout(), getOnPrepare(),
isShareUnitOfWork(), isParallelAggregate(), getCacheSize());
recipientListProcessor.setSynchronous(synchronous);
recipientListProcessor.setErrorHandler(errorHandler);
recipientListProcessor.setAggregateExecutorService(aggregateExecutorService);
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 4bf1ba68211..a8a6dccf517 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -162,10 +162,11 @@ public class RecipientListProcessor extends
MulticastProcessor {
AggregationStrategy aggregationStrategy,
boolean parallelProcessing, ExecutorService
executorService, boolean shutdownExecutorService,
boolean streaming, boolean stopOnException,
- long timeout, Processor onPrepare, boolean
shareUnitOfWork, boolean parallelAggregate) {
+ long timeout, Processor onPrepare, boolean
shareUnitOfWork, boolean parallelAggregate,
+ int cacheSize) {
super(camelContext, route, null, aggregationStrategy,
parallelProcessing, executorService, shutdownExecutorService,
streaming, stopOnException, timeout, onPrepare,
- shareUnitOfWork, parallelAggregate);
+ shareUnitOfWork, parallelAggregate, cacheSize);
this.expression = expression;
this.delimiter = delimiter;
this.producerCache = producerCache;
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 0224397e03b..062b668aa48 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -45,8 +45,6 @@ import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.ObjectHelper;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.StringHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.apache.camel.util.ObjectHelper.notNull;
@@ -56,8 +54,6 @@ import static org.apache.camel.util.ObjectHelper.notNull;
*/
public class Splitter extends MulticastProcessor implements AsyncProcessor,
Traceable {
- private static final Logger LOG = LoggerFactory.getLogger(Splitter.class);
-
private static final String IGNORE_DELIMITER_MARKER = "false";
private final Expression expression;
private final String delimiter;
@@ -79,7 +75,7 @@ public class Splitter extends MulticastProcessor implements
AsyncProcessor, Trac
boolean useSubUnitOfWork, boolean parallelAggregate,
String delimiter) {
super(camelContext, route, Collections.singleton(destination),
aggregationStrategy, parallelProcessing, executorService,
shutdownExecutorService, streaming, stopOnException,
- timeout, onPrepare, useSubUnitOfWork, parallelAggregate);
+ timeout, onPrepare, useSubUnitOfWork, parallelAggregate, 0);
this.expression = expression;
StringHelper.notEmpty(delimiter, "delimiter");
this.delimiter = delimiter;
diff --git
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
index 86d2de72f2a..b9b6fa79129 100644
---
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
+++
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
@@ -78,7 +78,7 @@ public class MulticastReifier extends
ProcessorReifier<MulticastDefinition> {
MulticastProcessor answer = new MulticastProcessor(
camelContext, route, list, strategy, isParallelProcessing,
threadPool, shutdownThreadPool, isStreaming,
- isStopOnException, timeout, prepare, isShareUnitOfWork,
isParallelAggregate);
+ isStopOnException, timeout, prepare, isShareUnitOfWork,
isParallelAggregate, 0);
answer.setSynchronous(isSynchronous);
return answer;
}