This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch poll-dyn
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 01f690efecadb7ecf17d2cc93b25f8d3c75a9733
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Feb 7 22:12:17 2025 +0100

    CAMEL-21733: camel-core - Poll EIP to support DynamicAware to reuse 
endpoints during dynamic poll EIP
---
 .../apache/camel/component/file/FileConsumer.java  | 25 +++++----
 .../camel/component/file/GenericFileConsumer.java  | 46 ++++++++++-----
 .../component/file/GenericFilePollingConsumer.java | 34 ++++++++---
 .../org/apache/camel/DynamicPollingConsumer.java   | 65 ++++++++++++++++++++++
 .../apache/camel/model/PollEnrichDefinition.java   | 20 +++++++
 .../org/apache/camel/processor/PollEnricher.java   | 13 ++++-
 ...> PollDynamicFileNameOptimizeDisabledTest.java} | 31 ++++++++++-
 .../enricher/PollDynamicFileNameTest.java          | 26 ++++++++-
 .../camel/support/ScheduledPollConsumer.java       | 12 ++++
 9 files changed, 232 insertions(+), 40 deletions(-)

diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
index d2b6dfbd57b..58df9d374c2 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -74,7 +74,7 @@ public class FileConsumer extends GenericFileConsumer<File> 
implements ResumeAwa
         return exchange;
     }
 
-    private boolean pollDirectory(File directory, List<GenericFile<File>> 
fileList, int depth) {
+    private boolean pollDirectory(Exchange dynamic, File directory, 
List<GenericFile<File>> fileList, int depth) {
         depth++;
 
         if (LOG.isTraceEnabled()) {
@@ -89,14 +89,14 @@ public class FileConsumer extends GenericFileConsumer<File> 
implements ResumeAwa
             Arrays.sort(files, Comparator.comparing(File::getAbsoluteFile));
         }
 
-        if (processPolledFiles(fileList, depth, files)) {
+        if (processPolledFiles(dynamic, fileList, depth, files)) {
             return false;
         }
 
         return true;
     }
 
-    private boolean processPolledFiles(List<GenericFile<File>> fileList, int 
depth, File[] files) {
+    private boolean processPolledFiles(Exchange dynamic, 
List<GenericFile<File>> fileList, int depth, File[] files) {
         for (File file : files) {
             // check if we can continue polling in files
             if (!canPollMoreFiles(fileList)) {
@@ -125,7 +125,7 @@ public class FileConsumer extends GenericFileConsumer<File> 
implements ResumeAwa
                 }
             }
 
-            if (processEntry(fileList, depth, file, gf, files)) {
+            if (processEntry(dynamic, fileList, depth, file, gf, files)) {
                 return true;
             }
         }
@@ -133,23 +133,25 @@ public class FileConsumer extends 
GenericFileConsumer<File> implements ResumeAwa
     }
 
     private boolean processEntry(
+            Exchange dynamic,
             List<GenericFile<File>> fileList, int depth, File file, 
Supplier<GenericFile<File>> gf, File[] files) {
         if (file.isDirectory()) {
-            return processDirectoryEntry(fileList, depth, file, gf, files);
+            return processDirectoryEntry(dynamic, fileList, depth, file, gf, 
files);
         } else {
-            processFileEntry(fileList, depth, file, gf, files);
+            processFileEntry(dynamic, fileList, depth, file, gf, files);
 
         }
         return false;
     }
 
     private void processFileEntry(
+            Exchange dynamic,
             List<GenericFile<File>> fileList, int depth, File file, 
Supplier<GenericFile<File>> gf, File[] files) {
         // Windows can report false to a file on a share so regard it
         // always as a file (if it is not a directory)
         if (depth >= endpoint.minDepth) {
             boolean valid
-                    = isValidFile(gf, file.getName(), file.getAbsolutePath(),
+                    = isValidFile(dynamic, gf, file.getName(), 
file.getAbsolutePath(),
                             getRelativeFilePath(endpointPath, null, null, 
file),
                             false, files);
             if (valid) {
@@ -168,14 +170,15 @@ public class FileConsumer extends 
GenericFileConsumer<File> implements ResumeAwa
     }
 
     private boolean processDirectoryEntry(
+            Exchange dynamic,
             List<GenericFile<File>> fileList, int depth, File file, 
Supplier<GenericFile<File>> gf, File[] files) {
         if (endpoint.isRecursive() && depth < endpoint.getMaxDepth()) {
             boolean valid
-                    = isValidFile(gf, file.getName(), file.getAbsolutePath(),
+                    = isValidFile(dynamic, gf, file.getName(), 
file.getAbsolutePath(),
                             getRelativeFilePath(endpointPath, null, null, 
file),
                             true, files);
             if (valid) {
-                boolean canPollMore = pollDirectory(file, fileList, depth);
+                boolean canPollMore = pollDirectory(dynamic, file, fileList, 
depth);
                 return !canPollMore;
             }
         }
@@ -194,7 +197,7 @@ public class FileConsumer extends GenericFileConsumer<File> 
implements ResumeAwa
     }
 
     @Override
-    protected boolean pollDirectory(String fileName, List<GenericFile<File>> 
fileList, int depth) {
+    protected boolean pollDirectory(Exchange dynamic, String fileName, 
List<GenericFile<File>> fileList, int depth) {
         LOG.trace("pollDirectory from fileName: {}", fileName);
 
         File directory = new File(fileName);
@@ -206,7 +209,7 @@ public class FileConsumer extends GenericFileConsumer<File> 
implements ResumeAwa
             return true;
         }
 
-        return pollDirectory(directory, fileList, depth);
+        return pollDirectory(dynamic, directory, fileList, depth);
     }
 
     private File[] listFiles(File directory) {
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 7f87cb2af1d..de0ffcf65a4 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -33,6 +33,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.support.EmptyAsyncCallback;
+import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
@@ -107,11 +108,16 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
      */
     protected abstract Exchange createExchange(GenericFile<T> file);
 
+    @Override
+    protected int poll() throws Exception {
+        return poll(null);
+    }
+
     /**
      * Poll for files
      */
     @Override
-    public int poll() throws Exception {
+    protected int poll(Exchange dynamic) throws Exception {
         // must prepare on startup the very first time
         if (!prepareOnStartup) {
             // prepare on startup
@@ -138,7 +144,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
         StopWatch stop = new StopWatch();
         boolean limitHit;
         try {
-            limitHit = !pollDirectory(name, files, 0);
+            limitHit = !pollDirectory(dynamic, name, files, 0);
         } catch (Exception e) {
             // during poll directory we add files to the in progress 
repository,
             // in case of any exception thrown after this work
@@ -336,7 +342,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
      * @return          whether or not to continue polling, <tt>false</tt> 
means the maxMessagesPerPoll limit has been
      *                  hit
      */
-    protected abstract boolean pollDirectory(String fileName, 
List<GenericFile<T>> fileList, int depth);
+    protected abstract boolean pollDirectory(Exchange dynamic, String 
fileName, List<GenericFile<T>> fileList, int depth);
 
     /**
      * Sets the operations to be used.
@@ -429,7 +435,6 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
         // must use full name when downloading so we have the correct path
         final String name = target.getAbsoluteFilePath();
         try {
-
             if (isRetrieveFile()) {
                 if (!tryRetrievingFile(exchange, name, target, 
absoluteFileName, file)) {
                     return false;
@@ -587,9 +592,10 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
      * @return             <tt>true</tt> to include the file, <tt>false</tt> 
to skip it
      */
     protected boolean isValidFile(
+            Exchange dynamic,
             Supplier<GenericFile<T>> file, String name, String 
absoluteFilePath,
             Supplier<String> relativeFilePath, boolean isDirectory, T[] files) 
{
-        if (!isMatched(file, name, absoluteFilePath, relativeFilePath, 
isDirectory, files)) {
+        if (!isMatched(dynamic, file, name, absoluteFilePath, 
relativeFilePath, isDirectory, files)) {
             LOG.trace("File did not match. Will skip this file: {}", name);
             return false;
         }
@@ -610,7 +616,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
         // if it is a file then check we have the file in the idempotent 
registry
         // already
         if (Boolean.TRUE.equals(endpoint.isIdempotent())) {
-            if (notUnique(file, absoluteFilePath)) {
+            if (notUnique(dynamic, file, absoluteFilePath)) {
                 return false;
             }
         }
@@ -621,13 +627,13 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
         return endpoint.getInProgressRepository().add(absoluteFilePath);
     }
 
-    private boolean notUnique(Supplier<GenericFile<T>> file, String 
absoluteFilePath) {
+    private boolean notUnique(Exchange dynamic, Supplier<GenericFile<T>> file, 
String absoluteFilePath) {
         boolean answer = false;
         // use absolute file path as default key, but evaluate if an
         // expression key was configured
         String key = absoluteFilePath;
         if (endpoint.getIdempotentKey() != null) {
-            Exchange dummy = endpoint.createExchange(file.get());
+            Exchange dummy = createDummy(dynamic, file);
             key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
             LOG.trace("Evaluated idempotentKey: {} for file: {}", key, file);
         }
@@ -686,6 +692,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
      * @return                  <tt>true</tt> if the file is matched, 
<tt>false</tt> if not
      */
     protected boolean isMatched(
+            Exchange dynamic,
             Supplier<GenericFile<T>> file, String name, String 
absoluteFilePath,
             Supplier<String> relativeFilePath, boolean isDirectory, T[] files) 
{
 
@@ -722,9 +729,8 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
         }
 
         if (isDirectory && endpoint.getFilterDirectory() != null) {
-            // create a dummy exchange as Exchange is needed for expression
-            // evaluation
-            Exchange dummy = endpoint.createExchange(file.get());
+            // create a dummy exchange as Exchange is needed for expression 
evaluation
+            Exchange dummy = createDummy(dynamic, file);
             boolean matches = endpoint.getFilterDirectory().matches(dummy);
             if (!matches) {
                 return false;
@@ -742,7 +748,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
 
         if (endpoint.getFileName() != null) {
             // create a dummy exchange as Exchange is needed for expression 
evaluation
-            Exchange dummy = endpoint.createExchange(file.get());
+            Exchange dummy = createDummy(dynamic, file);
             String result = evaluateFileExpression(dummy);
             if (result != null) {
                 if (!name.equals(result)) {
@@ -753,7 +759,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
 
         if (endpoint.getFilterFile() != null) {
             // create a dummy exchange as Exchange is needed for expression 
evaluation
-            Exchange dummy = endpoint.createExchange(file.get());
+            Exchange dummy = createDummy(dynamic, file);
             boolean matches = endpoint.getFilterFile().matches(dummy);
             if (!matches) {
                 return false;
@@ -849,6 +855,20 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
         return result;
     }
 
+    protected Exchange createDummy(Exchange dynamic, Supplier<GenericFile<T>> 
file) {
+        Exchange dummy = endpoint.createExchange(file.get());
+        if (dynamic != null) {
+            // enrich with data from dynamic source
+            if (dynamic.getMessage().hasHeaders()) {
+                MessageHelper.copyHeaders(dynamic.getMessage(), 
dummy.getMessage(), true);
+                if (dynamic.hasVariables()) {
+                    dummy.getVariables().putAll(dynamic.getVariables());
+                }
+            }
+        }
+        return dummy;
+    }
+
     @SuppressWarnings("unchecked")
     private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
         return (GenericFile<T>) 
exchange.getProperty(ExchangePropertyKey.FILE_EXCHANGE_FILE);
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
index 68c9474b44e..4f29e6358ac 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.file;
 
 import org.apache.camel.Consumer;
+import org.apache.camel.DynamicPollingConsumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.PollingConsumerPollStrategy;
@@ -26,7 +27,7 @@ import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class GenericFilePollingConsumer extends EventDrivenPollingConsumer {
+public class GenericFilePollingConsumer extends EventDrivenPollingConsumer 
implements DynamicPollingConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(GenericFilePollingConsumer.class);
     private final long delay;
@@ -65,11 +66,11 @@ public class GenericFilePollingConsumer extends 
EventDrivenPollingConsumer {
     }
 
     @Override
-    public Exchange receiveNoWait() {
+    public Exchange receiveNoWait(Exchange exchange) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("receiveNoWait polling file: {}", 
getConsumer().getEndpoint());
         }
-        int polled = doReceive(0);
+        int polled = doReceive(exchange, 0);
         if (polled > 0) {
             return super.receive(0);
         } else {
@@ -78,11 +79,11 @@ public class GenericFilePollingConsumer extends 
EventDrivenPollingConsumer {
     }
 
     @Override
-    public Exchange receive() {
+    public Exchange receive(Exchange exchange) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("receive polling file: {}", getConsumer().getEndpoint());
         }
-        int polled = doReceive(Long.MAX_VALUE);
+        int polled = doReceive(exchange, Long.MAX_VALUE);
         if (polled > 0) {
             return super.receive();
         } else {
@@ -91,11 +92,11 @@ public class GenericFilePollingConsumer extends 
EventDrivenPollingConsumer {
     }
 
     @Override
-    public Exchange receive(long timeout) {
+    public Exchange receive(Exchange exchange, long timeout) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("receive({}) polling file: {}", timeout, 
getConsumer().getEndpoint());
         }
-        int polled = doReceive(timeout);
+        int polled = doReceive(exchange, timeout);
         if (polled > 0) {
             return super.receive(timeout);
         } else {
@@ -103,7 +104,22 @@ public class GenericFilePollingConsumer extends 
EventDrivenPollingConsumer {
         }
     }
 
-    protected int doReceive(long timeout) {
+    @Override
+    public Exchange receiveNoWait() {
+        return receiveNoWait(null);
+    }
+
+    @Override
+    public Exchange receive() {
+        return receive(null);
+    }
+
+    @Override
+    public Exchange receive(long timeout) {
+        return receive(null, timeout);
+    }
+
+    protected int doReceive(Exchange exchange, long timeout) {
         int retryCounter = -1;
         boolean done = false;
         Throwable cause = null;
@@ -130,7 +146,7 @@ public class GenericFilePollingConsumer extends 
EventDrivenPollingConsumer {
                     boolean begin = pollStrategy.begin(getConsumer(), 
getEndpoint());
                     if (begin) {
                         retryCounter++;
-                        polledMessages = getConsumer().poll();
+                        polledMessages = getConsumer().poll(exchange);
                         LOG.trace("Polled {} messages", polledMessages);
 
                         if (polledMessages == 0 && sendEmptyMessageWhenIdle) {
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/DynamicPollingConsumer.java 
b/core/camel-api/src/main/java/org/apache/camel/DynamicPollingConsumer.java
new file mode 100644
index 00000000000..c939c607375
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/DynamicPollingConsumer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * A {@link PollingConsumer} that are used by dynamic Poll and PollEnrich EIPs 
to facilitate components that can use
+ * information from the current {@link Exchange} during the poll.
+ */
+public interface DynamicPollingConsumer extends PollingConsumer {
+
+    /**
+     * Waits until a message is available and then returns it. Warning that 
this method could block indefinitely if no
+     * messages are available.
+     * <p/>
+     * Will return <tt>null</tt> if the consumer is not started
+     * <p/>
+     * <b>Important: </b> See the class javadoc about the need for done the 
{@link org.apache.camel.spi.UnitOfWork} on
+     * the returned {@link Exchange}
+     *
+     * @param  exchange the current exchange
+     *
+     * @return          the message exchange received.
+     */
+    Exchange receive(Exchange exchange);
+
+    /**
+     * Attempts to receive a message exchange immediately without waiting and 
returning <tt>null</tt> if a message
+     * exchange is not available yet.
+     * <p/>
+     * <b>Important: </b> See the class javadoc about the need for done the 
{@link org.apache.camel.spi.UnitOfWork} on
+     * the returned {@link Exchange}
+     *
+     * @return the message exchange if one is immediately available otherwise 
<tt>null</tt>
+     */
+    Exchange receiveNoWait(Exchange exchange);
+
+    /**
+     * Attempts to receive a message exchange, waiting up to the given timeout 
to expire if a message is not yet
+     * available.
+     * <p/>
+     * <b>Important: </b> See the class javadoc about the need for done the 
{@link org.apache.camel.spi.UnitOfWork} on
+     * the returned {@link Exchange}
+     *
+     * @param  timeout the amount of time in milliseconds to wait for a 
message before timing out and returning
+     *                 <tt>null</tt>
+     *
+     * @return         the message exchange if one was available within the 
timeout period, or <tt>null</tt> if the
+     *                 timeout expired
+     */
+    Exchange receive(Exchange exchange, long timeout);
+}
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index a13f9df7f69..8ba0f887054 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -280,6 +280,16 @@ public class PollEnrichDefinition extends ExpressionNode
         return this;
     }
 
+    /**
+     * Whether to auto startup components when poll enricher is starting up.
+     *
+     * @return the builder
+     */
+    public PollEnrichDefinition autoStartComponents(boolean 
autoStartComponents) {
+        setAutoStartComponents(Boolean.toString(autoStartComponents));
+        return this;
+    }
+
     /**
      * Whether to allow components to optimise if they are {@link 
org.apache.camel.spi.SendDynamicAware}.
      *
@@ -290,6 +300,16 @@ public class PollEnrichDefinition extends ExpressionNode
         return this;
     }
 
+    /**
+     * Whether to allow components to optimise if they are {@link 
org.apache.camel.spi.SendDynamicAware}.
+     *
+     * @return the builder
+     */
+    public PollEnrichDefinition allowOptimisedComponents(boolean 
allowOptimisedComponents) {
+        
setAllowOptimisedComponents(Boolean.toString(allowOptimisedComponents));
+        return this;
+    }
+
     // Properties
     // 
-------------------------------------------------------------------------
 
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
index e2aeb8f66b7..0e81b621b02 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -25,6 +25,7 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
+import org.apache.camel.DynamicPollingConsumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePropertyKey;
@@ -323,6 +324,11 @@ public class PollEnricher extends AsyncProcessorSupport 
implements IdAware, Rout
         final Processor preProcessor = preAwareProcessor;
         final Processor postProcessor = postAwareProcessor;
 
+        DynamicPollingConsumer dynamicConsumer = null;
+        if (consumer instanceof DynamicPollingConsumer dyn) {
+            dynamicConsumer = dyn;
+        }
+
         Exchange resourceExchange;
         try {
             if (preProcessor != null) {
@@ -331,15 +337,16 @@ public class PollEnricher extends AsyncProcessorSupport 
implements IdAware, Rout
 
             if (timeout < 0) {
                 LOG.debug("Consumer receive: {}", consumer);
-                resourceExchange = consumer.receive();
+                resourceExchange = dynamicConsumer != null ? 
dynamicConsumer.receive(exchange) : consumer.receive();
             } else if (timeout == 0) {
                 LOG.debug("Consumer receiveNoWait: {}", consumer);
-                resourceExchange = consumer.receiveNoWait();
+                resourceExchange = dynamicConsumer != null ? 
dynamicConsumer.receiveNoWait(exchange) : consumer.receiveNoWait();
             } else {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Consumer receive with timeout: {} ms. {}", 
timeout, consumer);
                 }
-                resourceExchange = consumer.receive(timeout);
+                resourceExchange
+                        = dynamicConsumer != null ? 
dynamicConsumer.receive(exchange, timeout) : consumer.receive(timeout);
             }
 
             if (resourceExchange == null) {
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameOptimizeDisabledTest.java
similarity index 54%
copy from 
core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameTest.java
copy to 
core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameOptimizeDisabledTest.java
index 7eaf817514b..fcf2c588d51 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameOptimizeDisabledTest.java
@@ -19,12 +19,13 @@ package org.apache.camel.processor.enricher;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-public class PollDynamicFileNameTest extends ContextTestSupport {
+public class PollDynamicFileNameOptimizeDisabledTest extends 
ContextTestSupport {
 
     @Test
-    public void testPollEnrichFile() throws Exception {
+    public void testPollEnrichFileOne() throws Exception {
         getMockEndpoint("mock:result").expectedMessageCount(2);
         getMockEndpoint("mock:result").message(0).body().isEqualTo("Hello 
World");
         getMockEndpoint("mock:result").message(1).body().isNull();
@@ -35,6 +36,29 @@ public class PollDynamicFileNameTest extends 
ContextTestSupport {
         template.sendBodyAndHeader("direct:start", "Bar", "target", 
"unknown.txt");
 
         assertMockEndpointsSatisfied();
+
+        // there should only be 1 file endpoint
+        long c = context.getEndpoints().stream()
+                .filter(e -> e.getEndpointKey().startsWith("file") && 
e.getEndpointUri().contains("?fileName=")).count();
+        Assertions.assertEquals(2, c, "There should only be 2 file endpoints");
+    }
+
+    @Test
+    public void testPollEnrichFileTwo() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceivedInAnyOrder("Hello 
World", "Bye World");
+
+        template.sendBodyAndHeader(fileUri(), "Hello World", 
Exchange.FILE_NAME, "myfile.txt");
+        template.sendBodyAndHeader(fileUri(), "Bye World", Exchange.FILE_NAME, 
"myfile2.txt");
+
+        template.sendBodyAndHeader("direct:start", "Foo", "target", 
"myfile.txt");
+        template.sendBodyAndHeader("direct:start", "Bar", "target", 
"myfile2.txt");
+
+        assertMockEndpointsSatisfied();
+
+        // there should only be 1 file endpoint
+        long c = context.getEndpoints().stream()
+                .filter(e -> e.getEndpointKey().startsWith("file") && 
e.getEndpointUri().contains("?fileName=")).count();
+        Assertions.assertEquals(2, c, "There should only be 2 file endpoints");
     }
 
     @Override
@@ -43,7 +67,8 @@ public class PollDynamicFileNameTest extends 
ContextTestSupport {
             @Override
             public void configure() {
                 from("direct:start")
-                        .poll(fileUri() + 
"?noop=true&fileName=${header.target}", 500)
+                        .pollEnrich().simple(fileUri() + 
"?noop=true&fileName=${header.target}")
+                        .allowOptimisedComponents(false).timeout(500)
                         .to("mock:result");
             }
         };
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameTest.java
index 7eaf817514b..cd80d65f042 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/enricher/PollDynamicFileNameTest.java
@@ -19,12 +19,13 @@ package org.apache.camel.processor.enricher;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 public class PollDynamicFileNameTest extends ContextTestSupport {
 
     @Test
-    public void testPollEnrichFile() throws Exception {
+    public void testPollEnrichFileOne() throws Exception {
         getMockEndpoint("mock:result").expectedMessageCount(2);
         getMockEndpoint("mock:result").message(0).body().isEqualTo("Hello 
World");
         getMockEndpoint("mock:result").message(1).body().isNull();
@@ -35,6 +36,29 @@ public class PollDynamicFileNameTest extends 
ContextTestSupport {
         template.sendBodyAndHeader("direct:start", "Bar", "target", 
"unknown.txt");
 
         assertMockEndpointsSatisfied();
+
+        // there should only be 1 file endpoint
+        long c = context.getEndpoints().stream()
+                .filter(e -> e.getEndpointKey().startsWith("file") && 
e.getEndpointUri().contains("?fileName=")).count();
+        Assertions.assertEquals(1, c, "There should only be 1 file endpoint");
+    }
+
+    @Test
+    public void testPollEnrichFileTwo() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceivedInAnyOrder("Hello 
World", "Bye World");
+
+        template.sendBodyAndHeader(fileUri(), "Hello World", 
Exchange.FILE_NAME, "myfile.txt");
+        template.sendBodyAndHeader(fileUri(), "Bye World", Exchange.FILE_NAME, 
"myfile2.txt");
+
+        template.sendBodyAndHeader("direct:start", "Foo", "target", 
"myfile.txt");
+        template.sendBodyAndHeader("direct:start", "Bar", "target", 
"myfile2.txt");
+
+        assertMockEndpointsSatisfied();
+
+        // there should only be 1 file endpoint
+        long c = context.getEndpoints().stream()
+                .filter(e -> e.getEndpointKey().startsWith("file") && 
e.getEndpointUri().contains("?fileName=")).count();
+        Assertions.assertEquals(1, c, "There should only be 1 file endpoint");
     }
 
     @Override
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
index 658f7896d25..c6f852bbec2 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
@@ -568,6 +568,18 @@ public abstract class ScheduledPollConsumer extends 
DefaultConsumer
      */
     protected abstract int poll() throws Exception;
 
+    /**
+     * The polling method which is invoked periodically to poll this consumer, 
for components that support
+     * {@link org.apache.camel.DynamicPollingConsumer} such as camel-file.
+     *
+     * @param  dynamic   the current exchange when being used from Poll and 
PollEnrich EIPs in dynamic mode,
+     * @return           number of messages polled, will be <tt>0</tt> if no 
message was polled at all.
+     * @throws Exception can be thrown if an exception occurred during polling
+     */
+    protected int poll(Exchange dynamic) throws Exception {
+        return poll();
+    }
+
     @Override
     protected void doBuild() throws Exception {
         if (getHealthCheck() == null) {

Reply via email to