This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.18.x by this push:
     new cb1747369519 [backport camel-4.18.x] CAMEL-23840: Fix pollEnrich 
cacheSize(-1) not disabling consumer cache (#24285)
cb1747369519 is described below

commit cb17473695199c4f75aad3abaa24c58ddebd12e9
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Jun 29 09:21:36 2026 +0200

    [backport camel-4.18.x] CAMEL-23840: Fix pollEnrich cacheSize(-1) not 
disabling consumer cache (#24285)
    
    CAMEL-23840: Fix pollEnrich cacheSize(-1) not disabling consumer cache
    
    pollEnrich().cacheSize(-1) should disable the consumer cache entirely,
    but DefaultConsumerCache normalizes cacheSize <= 0 to the context
    maximum (default 1000), retaining up to 1000 polling consumers. For
    resource-backed components like SFTP, each retained consumer holds an
    open connection that is never cleaned up.
    
    Fix by introducing EmptyConsumerCache (mirroring EmptyProducerCache)
    that creates a fresh consumer on every acquire and stops it on release.
    PollEnricher.doBuild() now selects EmptyConsumerCache when cacheSize < 0,
    matching the pattern used by SendDynamicProcessor, RecipientList, and
    RoutingSlip on the producer side.
    
    Closes #24283
    
    Co-authored-by: Mark Snijder <[email protected]>
    Co-authored-by: OpenCode (DeepSeek V4 Pro) <[email protected]>
---
 .../org/apache/camel/processor/PollEnricher.java   | 12 +++-
 .../camel/impl/engine/EmptyConsumerCacheTest.java  | 67 ++++++++++++++++++
 .../camel/processor/PollEnrichNoCacheTest.java     | 72 ++++++++++++++++++-
 .../camel/support/cache/DefaultConsumerCache.java  | 19 +++--
 .../camel/support/cache/EmptyConsumerCache.java    | 80 ++++++++++++++++++++++
 5 files changed, 241 insertions(+), 9 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
index 50b83dca05e0..dc607426629b 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -48,6 +48,7 @@ import org.apache.camel.support.EndpointHelper;
 import org.apache.camel.support.EventDrivenPollingConsumer;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.cache.DefaultConsumerCache;
+import org.apache.camel.support.cache.EmptyConsumerCache;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
@@ -532,9 +533,14 @@ public class PollEnricher extends BaseProcessorSupport 
implements IdAware, Route
     @Override
     protected void doBuild() throws Exception {
         if (consumerCache == null) {
-            // create consumer cache if we use dynamic expressions for 
computing the endpoints to poll
-            consumerCache = new DefaultConsumerCache(this, camelContext, 
cacheSize);
-            LOG.debug("PollEnrich {} using ConsumerCache with cacheSize={}", 
this, cacheSize);
+            if (cacheSize < 0) {
+                consumerCache = new EmptyConsumerCache(this, camelContext);
+                LOG.debug("PollEnrich {} is not using ConsumerCache", this);
+            } else {
+                // create consumer cache if we use dynamic expressions for 
computing the endpoints to poll
+                consumerCache = new DefaultConsumerCache(this, camelContext, 
cacheSize);
+                LOG.debug("PollEnrich {} using ConsumerCache with 
cacheSize={}", this, cacheSize);
+            }
         }
         if (aggregationStrategy == null) {
             aggregationStrategy = new CopyAggregationStrategy();
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/impl/engine/EmptyConsumerCacheTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/impl/engine/EmptyConsumerCacheTest.java
new file mode 100644
index 000000000000..be89c3d7886b
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/impl/engine/EmptyConsumerCacheTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.spi.ConsumerCache;
+import org.apache.camel.support.cache.EmptyConsumerCache;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class EmptyConsumerCacheTest extends ContextTestSupport {
+
+    @Test
+    public void testEmptyCache() {
+        ConsumerCache cache = new EmptyConsumerCache(this, context);
+        cache.start();
+
+        assertEquals(0, cache.size(), "Size should be 0");
+
+        // we never cache any consumers
+        Endpoint e = context.getEndpoint("direct:queue:1");
+        PollingConsumer c = cache.acquirePollingConsumer(e);
+
+        assertEquals(0, cache.size(), "Size should be 0");
+
+        cache.releasePollingConsumer(e, c);
+
+        assertEquals(0, cache.size(), "Size should be 0");
+
+        cache.stop();
+    }
+
+    @Test
+    public void testCacheConsumerAcquireAndRelease() {
+        ConsumerCache cache = new EmptyConsumerCache(this, context);
+        cache.start();
+
+        assertEquals(0, cache.size(), "Size should be 0");
+
+        // we never cache any consumers
+        for (int i = 0; i < 1003; i++) {
+            Endpoint e = context.getEndpoint("direct:queue:" + i);
+            PollingConsumer c = cache.acquirePollingConsumer(e);
+            cache.releasePollingConsumer(e, c);
+        }
+
+        assertEquals(0, cache.size(), "Size should be 0");
+        cache.stop();
+    }
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java
index b6c3cd1cc44d..7b7dfb19aa59 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java
@@ -17,10 +17,20 @@
 package org.apache.camel.processor;
 
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.camel.Consumer;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.PollingConsumerSupport;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -30,6 +40,9 @@ public class PollEnrichNoCacheTest extends ContextTestSupport 
{
 
     @Test
     public void testNoCache() throws Exception {
+        final AtomicInteger stopped = new AtomicInteger();
+        context.addComponent("pollAssert", stopCountingComponent(stopped));
+
         assertEquals(1, context.getEndpointRegistry().size());
 
         sendBody("foo", "seda:x");
@@ -39,7 +52,7 @@ public class PollEnrichNoCacheTest extends ContextTestSupport 
{
         sendBody("bar", "seda:y");
         sendBody("bar", "seda:z");
 
-        // make sure its using an empty producer cache as the cache is disabled
+        // make sure its using an empty consumer cache as the cache is disabled
         List<Processor> list = getProcessors("foo");
         PollEnricher ep = (PollEnricher) list.get(0);
         assertNotNull(ep);
@@ -68,12 +81,69 @@ public class PollEnrichNoCacheTest extends 
ContextTestSupport {
         assertMockEndpointsSatisfied();
 
         assertEquals(4, context.getEndpointRegistry().size());
+
+        // also verify that cacheSize(-1) means consumers are not retained
+        sendBody("poll-one", "pollAssert:one");
+        sendBody("poll-two", "pollAssert:two");
+
+        assertEquals(2, stopped.get());
     }
 
     protected void sendBody(String body, String uri) {
         template.sendBodyAndHeader("direct:a", body, "myHeader", uri);
     }
 
+    /**
+     * A test-only component whose polling consumers increment the given 
counter in
+     * {@link PollingConsumerSupport#doStop()}, allowing the test to verify 
whether the consumer cache retains or
+     * discards consumers after use.
+     */
+    private static DefaultComponent stopCountingComponent(AtomicInteger 
stopped) {
+        return new DefaultComponent() {
+            @Override
+            protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) {
+                return new DefaultEndpoint(uri, this) {
+                    @Override
+                    public Producer createProducer() {
+                        return null;
+                    }
+
+                    @Override
+                    public Consumer createConsumer(Processor processor) {
+                        return null;
+                    }
+
+                    @Override
+                    public PollingConsumer createPollingConsumer() {
+                        return new PollingConsumerSupport(this) {
+                            @Override
+                            public Exchange receive() {
+                                Exchange ex = getEndpoint().createExchange();
+                                ex.getIn().setBody(remaining);
+                                return ex;
+                            }
+
+                            @Override
+                            public Exchange receive(long timeout) {
+                                return receive();
+                            }
+
+                            @Override
+                            public Exchange receiveNoWait() {
+                                return receive();
+                            }
+
+                            @Override
+                            protected void doStop() {
+                                stopped.incrementAndGet();
+                            }
+                        };
+                    }
+                };
+            }
+        };
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultConsumerCache.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultConsumerCache.java
index f85ab47ee68f..4e95e9991509 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultConsumerCache.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultConsumerCache.java
@@ -53,7 +53,12 @@ public class DefaultConsumerCache extends ServiceSupport 
implements ConsumerCach
         this.source = source;
         this.camelContext = camelContext;
         this.maxCacheSize = cacheSize <= 0 ? 
CamelContextHelper.getMaximumCachePoolSize(camelContext) : cacheSize;
-        this.consumers = createServicePool(camelContext, maxCacheSize);
+        if (cacheSize >= 0) {
+            this.consumers = createServicePool(camelContext, maxCacheSize);
+        } else {
+            // no cache then empty
+            this.consumers = null;
+        }
         // only if JMX is enabled
         if (camelContext.getManagementStrategy().getManagementAgent() != null) 
{
             this.extendedStatistics
@@ -194,7 +199,7 @@ public class DefaultConsumerCache extends ServiceSupport 
implements ConsumerCach
      */
     @Override
     public int size() {
-        int size = consumers.size();
+        int size = consumers != null ? consumers.size() : 0;
         LOG.trace("size = {}", size);
         return size;
     }
@@ -207,8 +212,10 @@ public class DefaultConsumerCache extends ServiceSupport 
implements ConsumerCach
         lock.lock();
         try {
             try {
-                consumers.stop();
-                consumers.start();
+                if (consumers != null) {
+                    consumers.stop();
+                    consumers.start();
+                }
             } catch (Exception e) {
                 LOG.debug("Error restarting consumer pool", e);
             }
@@ -222,7 +229,9 @@ public class DefaultConsumerCache extends ServiceSupport 
implements ConsumerCach
 
     @Override
     public void cleanUp() {
-        consumers.cleanUp();
+        if (consumers != null) {
+            consumers.cleanUp();
+        }
     }
 
     @Override
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/cache/EmptyConsumerCache.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/EmptyConsumerCache.java
new file mode 100644
index 000000000000..bca84dee8252
--- /dev/null
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/cache/EmptyConsumerCache.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support.cache;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.FailedToCreateConsumerException;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.support.service.ServiceHelper;
+
+/**
+ * A {@link org.apache.camel.spi.ConsumerCache} that does not cache {@link 
PollingConsumer}s but instead creates a new
+ * consumer on every {@link #acquirePollingConsumer(Endpoint)} and stops and 
shuts it down on
+ * {@link #releasePollingConsumer(Endpoint, PollingConsumer)}.
+ *
+ * @since 4.21
+ */
+public class EmptyConsumerCache extends DefaultConsumerCache {
+
+    private final Object source;
+    private final CamelContext ecc;
+
+    public EmptyConsumerCache(Object source, CamelContext camelContext) {
+        super(source, camelContext, -1);
+        this.source = source;
+        this.ecc = camelContext;
+        setExtendedStatistics(false);
+    }
+
+    @Override
+    public PollingConsumer acquirePollingConsumer(Endpoint endpoint) {
+        // always create a new consumer
+        PollingConsumer answer;
+        try {
+            answer = endpoint.createPollingConsumer();
+            boolean startingRoutes
+                    = ecc.getCamelContextExtension().isSetupRoutes() || 
ecc.getRouteController().isStartingRoutes();
+            if (startingRoutes) {
+                // if we are currently starting a route, then add as service 
and enlist in JMX
+                getCamelContext().addService(answer);
+            } else {
+                // must then start service so consumer is ready to be used
+                ServiceHelper.startService(answer);
+            }
+        } catch (Exception e) {
+            throw new FailedToCreateConsumerException(endpoint, e);
+        }
+        return answer;
+    }
+
+    @Override
+    public void releasePollingConsumer(Endpoint endpoint, PollingConsumer 
pollingConsumer) {
+        // stop and shutdown the consumer as its not cache or reused
+        ServiceHelper.stopAndShutdownService(pollingConsumer);
+    }
+
+    @Override
+    public int getCapacity() {
+        return 0;
+    }
+
+    @Override
+    public String toString() {
+        return "EmptyConsumerCache for source: " + source;
+    }
+}

Reply via email to