This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.22.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4477bbac44daed14f0096d23b43c06c8c609cd45
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Jun 17 12:17:41 2024 +0200

    CAMEL-20835: camel-core - Multicast/RecipientList use LRUCache with 
configurable cacheSize for internal errorHandler cache to avoid OOM. Thanks to 
Arthur Naseef for reproducer. (#14553)
---
 .../camel/processor/DefaultProcessorFactory.java   |  2 +-
 .../apache/camel/processor/MulticastProcessor.java | 43 ++++++++++++++--------
 .../org/apache/camel/processor/RecipientList.java  |  4 +-
 .../camel/processor/RecipientListProcessor.java    |  5 ++-
 .../java/org/apache/camel/processor/Splitter.java  |  6 +--
 .../org/apache/camel/reifier/MulticastReifier.java |  2 +-
 6 files changed, 36 insertions(+), 26 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java
index 1ededba6bdd..36bad7b7ad5 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DefaultProcessorFactory.java
@@ -118,7 +118,7 @@ public class DefaultProcessorFactory implements 
ProcessorFactory, BootstrapClose
             boolean shutdownExecutorService = (boolean) args[2];
             return new MulticastProcessor(
                     camelContext, null, processors, null, true, executor, 
shutdownExecutorService, false, false, 0,
-                    null, false, false);
+                    null, false, false, 0);
         } else if ("Pipeline".equals(definitionName)) {
             List<Processor> processors = (List<Processor>) args[0];
             return Pipeline.newInstance(camelContext, processors);
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index b11235a0e59..2e45254b9e7 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -64,9 +64,11 @@ import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ExchangeHelper;
+import org.apache.camel.support.LRUCacheFactory;
 import org.apache.camel.support.PatternHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
@@ -170,7 +172,8 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
     private ExecutorService aggregateExecutorService;
     private boolean shutdownAggregateExecutorService;
     private final long timeout;
-    private final ConcurrentMap<Processor, Processor> errorHandlers = new 
ConcurrentHashMap<>();
+    private final int cacheSize;
+    private final ConcurrentMap<Processor, Processor> errorHandlers;
     private final boolean shareUnitOfWork;
 
     public MulticastProcessor(CamelContext camelContext, Route route, 
Collection<Processor> processors) {
@@ -179,7 +182,9 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
     public MulticastProcessor(CamelContext camelContext, Route route, 
Collection<Processor> processors,
                               AggregationStrategy aggregationStrategy) {
-        this(camelContext, route, processors, aggregationStrategy, false, 
null, false, false, false, 0, null, false, false);
+        this(camelContext, route, processors, aggregationStrategy, false, null,
+             false, false, false, 0, null,
+             false, false, 
CamelContextHelper.getMaximumCachePoolSize(camelContext));
     }
 
     public MulticastProcessor(CamelContext camelContext, Route route, 
Collection<Processor> processors,
@@ -187,7 +192,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
                               boolean parallelProcessing, ExecutorService 
executorService, boolean shutdownExecutorService,
                               boolean streaming,
                               boolean stopOnException, long timeout, Processor 
onPrepare, boolean shareUnitOfWork,
-                              boolean parallelAggregate) {
+                              boolean parallelAggregate, int cacheSize) {
         notNull(camelContext, "camelContext");
         this.camelContext = camelContext;
         this.internalProcessorFactory = 
camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory();
@@ -208,6 +213,13 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         this.parallelAggregate = parallelAggregate;
         this.processorExchangeFactory = 
camelContext.adapt(ExtendedCamelContext.class)
                 
.getProcessorExchangeFactory().newProcessorExchangeFactory(this);
+        this.cacheSize = cacheSize;
+        if (cacheSize >= 0) {
+            this.errorHandlers = (ConcurrentMap) 
LRUCacheFactory.newLRUCache(cacheSize);
+        } else {
+            // no cache
+            this.errorHandlers = null;
+        }
     }
 
     @Override
@@ -1022,6 +1034,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         return new DefaultProcessorExchangePair(index, processor, prepared, 
exchange);
     }
 
+    @SuppressWarnings("unchecked")
     protected Processor wrapInErrorHandler(Route route, Exchange exchange, 
Processor processor) {
         Processor answer;
         Processor key = processor;
@@ -1039,7 +1052,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
             // for the entire multicast block again which will start from 
scratch again
 
             // lookup cached first to reuse and preserve memory
-            answer = errorHandlers.get(key);
+            answer = errorHandlers != null ? errorHandlers.get(key) : null;
             if (answer != null) {
                 LOG.trace("Using existing error handler for: {}", key);
                 return answer;
@@ -1058,16 +1071,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
                 ServiceHelper.startService(answer);
 
                 // here we don't cache the child unit of work
-                if (!child) {
-                    // add to cache
-                    // TODO returned value ignored intentionally?
-                    // Findbugs alert:
-                    // The putIfAbsent method is typically used to ensure that 
a single value
-                    // is associated with a given key (the first value for 
which put if absent succeeds).
-                    // If you ignore the return value and retain a reference 
to the value passed in,
-                    // you run the risk of retaining a value that is not the 
one that is associated
-                    // with the key in the map. If it matters which one you 
use and you use the one
-                    // that isn't stored in the map, your program will behave 
incorrectly.
+                if (!child && errorHandlers != null) {
                     errorHandlers.putIfAbsent(key, answer);
                 }
 
@@ -1159,7 +1163,9 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
     protected void doShutdown() throws Exception {
         ServiceHelper.stopAndShutdownServices(processors, errorHandlers, 
aggregationStrategy, processorExchangeFactory);
         // only clear error handlers when shutting down
-        errorHandlers.clear();
+        if (errorHandlers != null) {
+            errorHandlers.clear();
+        }
 
         if (shutdownExecutorService && executorService != null) {
             
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
@@ -1266,6 +1272,13 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         return timeout;
     }
 
+    /**
+     * Maximum cache size used for reusing processors
+     */
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
     /**
      * Use {@link #getAggregationStrategy(org.apache.camel.Exchange)} instead.
      */
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
index dbccb6cacf7..beb0720be53 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -204,8 +204,8 @@ public class RecipientList extends AsyncProcessorSupport 
implements IdAware, Rou
 
         recipientListProcessor = new RecipientListProcessor(
                 camelContext, null, expression, delimiter, producerCache, 
getAggregationStrategy(),
-                isParallelProcessing(), getExecutorService(), 
isShutdownExecutorService(),
-                isStreaming(), isStopOnException(), getTimeout(), 
getOnPrepare(), isShareUnitOfWork(), isParallelAggregate());
+                isParallelProcessing(), getExecutorService(), 
isShutdownExecutorService(), isStreaming(),
+                isStopOnException(), getTimeout(), getOnPrepare(), 
isShareUnitOfWork(), isParallelAggregate(), getCacheSize());
         recipientListProcessor.setSynchronous(synchronous);
         recipientListProcessor.setErrorHandler(errorHandler);
         
recipientListProcessor.setAggregateExecutorService(aggregateExecutorService);
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 8c1a735a86d..b5b13ca6322 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -164,10 +164,11 @@ public class RecipientListProcessor extends 
MulticastProcessor {
                                   AggregationStrategy aggregationStrategy,
                                   boolean parallelProcessing, ExecutorService 
executorService, boolean shutdownExecutorService,
                                   boolean streaming, boolean stopOnException,
-                                  long timeout, Processor onPrepare, boolean 
shareUnitOfWork, boolean parallelAggregate) {
+                                  long timeout, Processor onPrepare, boolean 
shareUnitOfWork, boolean parallelAggregate,
+                                  int cacheSize) {
         super(camelContext, route, null, aggregationStrategy, 
parallelProcessing, executorService, shutdownExecutorService,
               streaming, stopOnException, timeout, onPrepare,
-              shareUnitOfWork, parallelAggregate);
+              shareUnitOfWork, parallelAggregate, cacheSize);
         this.expression = expression;
         this.delimiter = delimiter;
         this.producerCache = producerCache;
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 597bc007391..a27d50ad283 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -46,8 +46,6 @@ import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.ObjectHelper;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.StringHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.apache.camel.util.ObjectHelper.notNull;
 
@@ -57,8 +55,6 @@ import static org.apache.camel.util.ObjectHelper.notNull;
  */
 public class Splitter extends MulticastProcessor implements AsyncProcessor, 
Traceable {
 
-    private static final Logger LOG = LoggerFactory.getLogger(Splitter.class);
-
     private static final String IGNORE_DELIMITER_MARKER = "false";
     private final Expression expression;
     private final String delimiter;
@@ -80,7 +76,7 @@ public class Splitter extends MulticastProcessor implements 
AsyncProcessor, Trac
                     boolean useSubUnitOfWork, boolean parallelAggregate, 
String delimiter) {
         super(camelContext, route, Collections.singleton(destination), 
aggregationStrategy, parallelProcessing, executorService,
               shutdownExecutorService, streaming, stopOnException,
-              timeout, onPrepare, useSubUnitOfWork, parallelAggregate);
+              timeout, onPrepare, useSubUnitOfWork, parallelAggregate, 0);
         this.expression = expression;
         StringHelper.notEmpty(delimiter, "delimiter");
         this.delimiter = delimiter;
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
index 86d2de72f2a..b9b6fa79129 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/MulticastReifier.java
@@ -78,7 +78,7 @@ public class MulticastReifier extends 
ProcessorReifier<MulticastDefinition> {
 
         MulticastProcessor answer = new MulticastProcessor(
                 camelContext, route, list, strategy, isParallelProcessing, 
threadPool, shutdownThreadPool, isStreaming,
-                isStopOnException, timeout, prepare, isShareUnitOfWork, 
isParallelAggregate);
+                isStopOnException, timeout, prepare, isShareUnitOfWork, 
isParallelAggregate, 0);
         answer.setSynchronous(isSynchronous);
         return answer;
     }

Reply via email to