This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f27c9b2127680d65f070d3468e52af6d0b4ea01f
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Feb 26 08:37:55 2025 +0100

    CAMEL-21733: camel-core - Poll EIP to support DynamicAware to reuse 
endpoints during dynamic poll EIP
---
 .../camel/component/file/GenericFileConsumer.java  | 23 ++++------------------
 ...GenericFileDefaultMoveExistingFileStrategy.java |  3 +--
 .../camel/component/file/GenericFileHelper.java    | 19 ++++++++++++++++++
 .../component/file/GenericFileOnCompletion.java    |  4 ++--
 ...dempotentChangedRepositoryReadLockStrategy.java | 11 ++++++-----
 ...IdempotentRenameRepositoryReadLockStrategy.java | 11 ++++++-----
 .../FileIdempotentRepositoryReadLockStrategy.java  | 11 ++++++-----
 7 files changed, 44 insertions(+), 38 deletions(-)

diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index de0ffcf65a4..73c0c3c82d9 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -33,7 +33,6 @@ import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.support.EmptyAsyncCallback;
-import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
@@ -633,7 +632,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
         // expression key was configured
         String key = absoluteFilePath;
         if (endpoint.getIdempotentKey() != null) {
-            Exchange dummy = createDummy(dynamic, file);
+            Exchange dummy = GenericFileHelper.createDummy(endpoint, dynamic, 
file);
             key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
             LOG.trace("Evaluated idempotentKey: {} for file: {}", key, file);
         }
@@ -730,7 +729,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
 
         if (isDirectory && endpoint.getFilterDirectory() != null) {
             // create a dummy exchange as Exchange is needed for expression 
evaluation
-            Exchange dummy = createDummy(dynamic, file);
+            Exchange dummy = GenericFileHelper.createDummy(endpoint, dynamic, 
file);
             boolean matches = endpoint.getFilterDirectory().matches(dummy);
             if (!matches) {
                 return false;
@@ -748,7 +747,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
 
         if (endpoint.getFileName() != null) {
             // create a dummy exchange as Exchange is needed for expression 
evaluation
-            Exchange dummy = createDummy(dynamic, file);
+            Exchange dummy = GenericFileHelper.createDummy(endpoint, dynamic, 
file);
             String result = evaluateFileExpression(dummy);
             if (result != null) {
                 if (!name.equals(result)) {
@@ -759,7 +758,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
 
         if (endpoint.getFilterFile() != null) {
             // create a dummy exchange as Exchange is needed for expression 
evaluation
-            Exchange dummy = createDummy(dynamic, file);
+            Exchange dummy = GenericFileHelper.createDummy(endpoint, dynamic, 
file);
             boolean matches = endpoint.getFilterFile().matches(dummy);
             if (!matches) {
                 return false;
@@ -855,20 +854,6 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
         return result;
     }
 
-    protected Exchange createDummy(Exchange dynamic, Supplier<GenericFile<T>> 
file) {
-        Exchange dummy = endpoint.createExchange(file.get());
-        if (dynamic != null) {
-            // enrich with data from dynamic source
-            if (dynamic.getMessage().hasHeaders()) {
-                MessageHelper.copyHeaders(dynamic.getMessage(), 
dummy.getMessage(), true);
-                if (dynamic.hasVariables()) {
-                    dummy.getVariables().putAll(dynamic.getVariables());
-                }
-            }
-        }
-        return dummy;
-    }
-
     @SuppressWarnings("unchecked")
     private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
         return (GenericFile<T>) 
exchange.getProperty(ExchangePropertyKey.FILE_EXCHANGE_FILE);
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileDefaultMoveExistingFileStrategy.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileDefaultMoveExistingFileStrategy.java
index 7f79c8b63fa..619407a61d8 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileDefaultMoveExistingFileStrategy.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileDefaultMoveExistingFileStrategy.java
@@ -41,8 +41,7 @@ public class GenericFileDefaultMoveExistingFileStrategy 
implements FileMoveExist
         // need to evaluate using a dummy and simulate the file first, to have
         // access to all the file attributes
         // create a dummy exchange as Exchange is needed for expression
-        // evaluation
-        // we support only the following 3 tokens.
+        // evaluation we support only the following 3 tokens.
         Exchange dummy = endpoint.createExchange();
         String parent = FileUtil.onlyPath(fileName);
         String onlyName = FileUtil.stripPath(fileName);
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileHelper.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileHelper.java
index 300492db037..b86351c010d 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileHelper.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileHelper.java
@@ -16,6 +16,11 @@
  */
 package org.apache.camel.component.file;
 
+import java.util.function.Supplier;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.support.MessageHelper;
+
 public final class GenericFileHelper {
 
     private GenericFileHelper() {
@@ -36,4 +41,18 @@ public final class GenericFileHelper {
         return path + "-" + key;
     }
 
+    public static <T> Exchange createDummy(GenericFileEndpoint<T> endpoint, 
Exchange dynamic, Supplier<GenericFile<T>> file) {
+        Exchange dummy = endpoint.createExchange(file.get());
+        if (dynamic != null) {
+            // enrich with data from dynamic source
+            if (dynamic.getMessage().hasHeaders()) {
+                MessageHelper.copyHeaders(dynamic.getMessage(), 
dummy.getMessage(), true);
+                if (dynamic.hasVariables()) {
+                    dummy.getVariables().putAll(dynamic.getVariables());
+                }
+            }
+        }
+        return dummy;
+    }
+
 }
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
index 626bfb98abe..f731aff6faf 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java
@@ -116,7 +116,7 @@ public class GenericFileOnCompletion<T> implements 
Synchronization {
             // expression key was configured
             String key = absoluteFileName;
             if (endpoint.getIdempotentKey() != null) {
-                Exchange dummy = endpoint.createExchange(file);
+                Exchange dummy = GenericFileHelper.createDummy(endpoint, 
exchange, () -> file);
                 key = endpoint.getIdempotentKey().evaluate(dummy, 
String.class);
             }
             // only add to idempotent repository if we could process the file
@@ -159,7 +159,7 @@ public class GenericFileOnCompletion<T> implements 
Synchronization {
             // expression key was configured
             String key = absoluteFileName;
             if (endpoint.getIdempotentKey() != null) {
-                Exchange dummy = endpoint.createExchange(file);
+                Exchange dummy = GenericFileHelper.createDummy(endpoint, 
exchange, () -> file);
                 key = endpoint.getIdempotentKey().evaluate(dummy, 
String.class);
             }
             if (key != null) {
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
index 8ee5e6a3878..506d3c0b066 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
@@ -27,6 +27,7 @@ import org.apache.camel.LoggingLevel;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileEndpoint;
 import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
+import org.apache.camel.component.file.GenericFileHelper;
 import org.apache.camel.component.file.GenericFileOperations;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.IdempotentRepository;
@@ -85,7 +86,7 @@ public class FileIdempotentChangedRepositoryReadLockStrategy 
extends ServiceSupp
         }
 
         // check if we can begin on this file
-        String key = asKey(file);
+        String key = asKey(exchange, file);
         boolean answer = false;
         try {
             answer = idempotentRepository.add(exchange, key);
@@ -121,7 +122,7 @@ public class 
FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp
     public void releaseExclusiveReadLockOnRollback(
             GenericFileOperations<File> operations, GenericFile<File> file, 
Exchange exchange)
             throws Exception {
-        String key = asKey(file);
+        String key = asKey(exchange, file);
         Runnable r = () -> {
             if (removeOnRollback) {
                 idempotentRepository.remove(exchange, key);
@@ -158,7 +159,7 @@ public class 
FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp
     public void releaseExclusiveReadLockOnCommit(
             GenericFileOperations<File> operations, GenericFile<File> file, 
Exchange exchange)
             throws Exception {
-        String key = asKey(file);
+        String key = asKey(exchange, file);
         Runnable r = () -> {
             if (removeOnCommit) {
                 idempotentRepository.remove(exchange, key);
@@ -311,12 +312,12 @@ public class 
FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp
         this.readLockIdempotentReleaseExecutorService = 
readLockIdempotentReleaseExecutorService;
     }
 
-    protected String asKey(GenericFile<File> file) {
+    protected String asKey(Exchange exchange, GenericFile<File> file) {
         // use absolute file path as default key, but evaluate if an expression
         // key was configured
         String key = file.getAbsoluteFilePath();
         if (endpoint.getIdempotentKey() != null) {
-            Exchange dummy = endpoint.createExchange(file);
+            Exchange dummy = GenericFileHelper.createDummy(endpoint, exchange, 
() -> file);
             key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
         }
         return key;
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
index ada6a844f90..62538cf86a5 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
@@ -25,6 +25,7 @@ import org.apache.camel.LoggingLevel;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileEndpoint;
 import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
+import org.apache.camel.component.file.GenericFileHelper;
 import org.apache.camel.component.file.GenericFileOperations;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.IdempotentRepository;
@@ -77,7 +78,7 @@ public class FileIdempotentRenameRepositoryReadLockStrategy 
extends ServiceSuppo
         }
 
         // check if we can begin on this file
-        String key = asKey(file);
+        String key = asKey(exchange, file);
         boolean answer = false;
         try {
             answer = idempotentRepository.add(exchange, key);
@@ -113,7 +114,7 @@ public class FileIdempotentRenameRepositoryReadLockStrategy 
extends ServiceSuppo
     public void releaseExclusiveReadLockOnRollback(
             GenericFileOperations<File> operations, GenericFile<File> file, 
Exchange exchange)
             throws Exception {
-        String key = asKey(file);
+        String key = asKey(exchange, file);
         if (removeOnRollback) {
             idempotentRepository.remove(exchange, key);
         } else {
@@ -128,7 +129,7 @@ public class FileIdempotentRenameRepositoryReadLockStrategy 
extends ServiceSuppo
     public void releaseExclusiveReadLockOnCommit(
             GenericFileOperations<File> operations, GenericFile<File> file, 
Exchange exchange)
             throws Exception {
-        String key = asKey(file);
+        String key = asKey(exchange, file);
         if (removeOnCommit) {
             idempotentRepository.remove(exchange, key);
         } else {
@@ -225,12 +226,12 @@ public class 
FileIdempotentRenameRepositoryReadLockStrategy extends ServiceSuppo
         this.removeOnCommit = removeOnCommit;
     }
 
-    protected String asKey(GenericFile<File> file) {
+    protected String asKey(Exchange exchange, GenericFile<File> file) {
         // use absolute file path as default key, but evaluate if an expression
         // key was configured
         String key = file.getAbsoluteFilePath();
         if (endpoint.getIdempotentKey() != null) {
-            Exchange dummy = endpoint.createExchange(file);
+            Exchange dummy = GenericFileHelper.createDummy(endpoint, exchange, 
() -> file);
             key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
         }
         return key;
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
index adb4098b764..abbf18f8361 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
@@ -27,6 +27,7 @@ import org.apache.camel.LoggingLevel;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileEndpoint;
 import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
+import org.apache.camel.component.file.GenericFileHelper;
 import org.apache.camel.component.file.GenericFileOperations;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.IdempotentRepository;
@@ -74,7 +75,7 @@ public class FileIdempotentRepositoryReadLockStrategy extends 
ServiceSupport
         }
 
         // check if we can begin on this file
-        String key = asKey(file);
+        String key = asKey(exchange, file);
         boolean answer = false;
         try {
             answer = idempotentRepository.add(exchange, key);
@@ -101,7 +102,7 @@ public class FileIdempotentRepositoryReadLockStrategy 
extends ServiceSupport
     public void releaseExclusiveReadLockOnRollback(
             GenericFileOperations<File> operations, GenericFile<File> file, 
Exchange exchange)
             throws Exception {
-        String key = asKey(file);
+        String key = asKey(exchange, file);
         Runnable r = () -> {
             if (removeOnRollback) {
                 idempotentRepository.remove(exchange, key);
@@ -118,7 +119,7 @@ public class FileIdempotentRepositoryReadLockStrategy 
extends ServiceSupport
     public void releaseExclusiveReadLockOnCommit(
             GenericFileOperations<File> operations, GenericFile<File> file, 
Exchange exchange)
             throws Exception {
-        String key = asKey(file);
+        String key = asKey(exchange, file);
         Runnable r = () -> {
             if (removeOnCommit) {
                 idempotentRepository.remove(exchange, key);
@@ -274,12 +275,12 @@ public class FileIdempotentRepositoryReadLockStrategy 
extends ServiceSupport
         this.readLockIdempotentReleaseExecutorService = 
readLockIdempotentReleaseExecutorService;
     }
 
-    protected String asKey(GenericFile<File> file) {
+    protected String asKey(Exchange exchange, GenericFile<File> file) {
         // use absolute file path as default key, but evaluate if an expression
         // key was configured
         String key = file.getAbsoluteFilePath();
         if (endpoint.getIdempotentKey() != null) {
-            Exchange dummy = endpoint.createExchange(file);
+            Exchange dummy = GenericFileHelper.createDummy(endpoint, exchange, 
() -> file);
             key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
         }
         return key;

Reply via email to