This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit f27c9b2127680d65f070d3468e52af6d0b4ea01f Author: Claus Ibsen <[email protected]> AuthorDate: Wed Feb 26 08:37:55 2025 +0100 CAMEL-21733: camel-core - Poll EIP to support DynamicAware to reuse endpoints during dynamic poll EIP --- .../camel/component/file/GenericFileConsumer.java | 23 ++++------------------ ...GenericFileDefaultMoveExistingFileStrategy.java | 3 +-- .../camel/component/file/GenericFileHelper.java | 19 ++++++++++++++++++ .../component/file/GenericFileOnCompletion.java | 4 ++-- ...dempotentChangedRepositoryReadLockStrategy.java | 11 ++++++----- ...IdempotentRenameRepositoryReadLockStrategy.java | 11 ++++++----- .../FileIdempotentRepositoryReadLockStrategy.java | 11 ++++++----- 7 files changed, 44 insertions(+), 38 deletions(-) diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java index de0ffcf65a4..73c0c3c82d9 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java @@ -33,7 +33,6 @@ import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.ShutdownRunningTask; import org.apache.camel.support.EmptyAsyncCallback; -import org.apache.camel.support.MessageHelper; import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.CastUtils; @@ -633,7 +632,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum // expression key was configured String key = absoluteFilePath; if (endpoint.getIdempotentKey() != null) { - Exchange dummy = createDummy(dynamic, file); + Exchange dummy = GenericFileHelper.createDummy(endpoint, dynamic, file); key = endpoint.getIdempotentKey().evaluate(dummy, String.class); LOG.trace("Evaluated idempotentKey: {} for file: {}", key, file); } @@ -730,7 +729,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum if (isDirectory && endpoint.getFilterDirectory() != null) { // create a dummy exchange as Exchange is needed for expression evaluation - Exchange dummy = createDummy(dynamic, file); + Exchange dummy = GenericFileHelper.createDummy(endpoint, dynamic, file); boolean matches = endpoint.getFilterDirectory().matches(dummy); if (!matches) { return false; @@ -748,7 +747,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum if (endpoint.getFileName() != null) { // create a dummy exchange as Exchange is needed for expression evaluation - Exchange dummy = createDummy(dynamic, file); + Exchange dummy = GenericFileHelper.createDummy(endpoint, dynamic, file); String result = evaluateFileExpression(dummy); if (result != null) { if (!name.equals(result)) { @@ -759,7 +758,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum if (endpoint.getFilterFile() != null) { // create a dummy exchange as Exchange is needed for expression evaluation - Exchange dummy = createDummy(dynamic, file); + Exchange dummy = GenericFileHelper.createDummy(endpoint, dynamic, file); boolean matches = endpoint.getFilterFile().matches(dummy); if (!matches) { return false; @@ -855,20 +854,6 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum return result; } - protected Exchange createDummy(Exchange dynamic, Supplier<GenericFile<T>> file) { - Exchange dummy = endpoint.createExchange(file.get()); - if (dynamic != null) { - // enrich with data from dynamic source - if (dynamic.getMessage().hasHeaders()) { - MessageHelper.copyHeaders(dynamic.getMessage(), dummy.getMessage(), true); - if (dynamic.hasVariables()) { - dummy.getVariables().putAll(dynamic.getVariables()); - } - } - } - return dummy; - } - @SuppressWarnings("unchecked") private GenericFile<T> getExchangeFileProperty(Exchange exchange) { return (GenericFile<T>) exchange.getProperty(ExchangePropertyKey.FILE_EXCHANGE_FILE); diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileDefaultMoveExistingFileStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileDefaultMoveExistingFileStrategy.java index 7f79c8b63fa..619407a61d8 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileDefaultMoveExistingFileStrategy.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileDefaultMoveExistingFileStrategy.java @@ -41,8 +41,7 @@ public class GenericFileDefaultMoveExistingFileStrategy implements FileMoveExist // need to evaluate using a dummy and simulate the file first, to have // access to all the file attributes // create a dummy exchange as Exchange is needed for expression - // evaluation - // we support only the following 3 tokens. + // evaluation we support only the following 3 tokens. Exchange dummy = endpoint.createExchange(); String parent = FileUtil.onlyPath(fileName); String onlyName = FileUtil.stripPath(fileName); diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileHelper.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileHelper.java index 300492db037..b86351c010d 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileHelper.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileHelper.java @@ -16,6 +16,11 @@ */ package org.apache.camel.component.file; +import java.util.function.Supplier; + +import org.apache.camel.Exchange; +import org.apache.camel.support.MessageHelper; + public final class GenericFileHelper { private GenericFileHelper() { @@ -36,4 +41,18 @@ public final class GenericFileHelper { return path + "-" + key; } + public static <T> Exchange createDummy(GenericFileEndpoint<T> endpoint, Exchange dynamic, Supplier<GenericFile<T>> file) { + Exchange dummy = endpoint.createExchange(file.get()); + if (dynamic != null) { + // enrich with data from dynamic source + if (dynamic.getMessage().hasHeaders()) { + MessageHelper.copyHeaders(dynamic.getMessage(), dummy.getMessage(), true); + if (dynamic.hasVariables()) { + dummy.getVariables().putAll(dynamic.getVariables()); + } + } + } + return dummy; + } + } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java index 626bfb98abe..f731aff6faf 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileOnCompletion.java @@ -116,7 +116,7 @@ public class GenericFileOnCompletion<T> implements Synchronization { // expression key was configured String key = absoluteFileName; if (endpoint.getIdempotentKey() != null) { - Exchange dummy = endpoint.createExchange(file); + Exchange dummy = GenericFileHelper.createDummy(endpoint, exchange, () -> file); key = endpoint.getIdempotentKey().evaluate(dummy, String.class); } // only add to idempotent repository if we could process the file @@ -159,7 +159,7 @@ public class GenericFileOnCompletion<T> implements Synchronization { // expression key was configured String key = absoluteFileName; if (endpoint.getIdempotentKey() != null) { - Exchange dummy = endpoint.createExchange(file); + Exchange dummy = GenericFileHelper.createDummy(endpoint, exchange, () -> file); key = endpoint.getIdempotentKey().evaluate(dummy, String.class); } if (key != null) { diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java index 8ee5e6a3878..506d3c0b066 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java @@ -27,6 +27,7 @@ import org.apache.camel.LoggingLevel; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileEndpoint; import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy; +import org.apache.camel.component.file.GenericFileHelper; import org.apache.camel.component.file.GenericFileOperations; import org.apache.camel.spi.CamelLogger; import org.apache.camel.spi.IdempotentRepository; @@ -85,7 +86,7 @@ public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp } // check if we can begin on this file - String key = asKey(file); + String key = asKey(exchange, file); boolean answer = false; try { answer = idempotentRepository.add(exchange, key); @@ -121,7 +122,7 @@ public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp public void releaseExclusiveReadLockOnRollback( GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { - String key = asKey(file); + String key = asKey(exchange, file); Runnable r = () -> { if (removeOnRollback) { idempotentRepository.remove(exchange, key); @@ -158,7 +159,7 @@ public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp public void releaseExclusiveReadLockOnCommit( GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { - String key = asKey(file); + String key = asKey(exchange, file); Runnable r = () -> { if (removeOnCommit) { idempotentRepository.remove(exchange, key); @@ -311,12 +312,12 @@ public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp this.readLockIdempotentReleaseExecutorService = readLockIdempotentReleaseExecutorService; } - protected String asKey(GenericFile<File> file) { + protected String asKey(Exchange exchange, GenericFile<File> file) { // use absolute file path as default key, but evaluate if an expression // key was configured String key = file.getAbsoluteFilePath(); if (endpoint.getIdempotentKey() != null) { - Exchange dummy = endpoint.createExchange(file); + Exchange dummy = GenericFileHelper.createDummy(endpoint, exchange, () -> file); key = endpoint.getIdempotentKey().evaluate(dummy, String.class); } return key; diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java index ada6a844f90..62538cf86a5 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java @@ -25,6 +25,7 @@ import org.apache.camel.LoggingLevel; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileEndpoint; import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy; +import org.apache.camel.component.file.GenericFileHelper; import org.apache.camel.component.file.GenericFileOperations; import org.apache.camel.spi.CamelLogger; import org.apache.camel.spi.IdempotentRepository; @@ -77,7 +78,7 @@ public class FileIdempotentRenameRepositoryReadLockStrategy extends ServiceSuppo } // check if we can begin on this file - String key = asKey(file); + String key = asKey(exchange, file); boolean answer = false; try { answer = idempotentRepository.add(exchange, key); @@ -113,7 +114,7 @@ public class FileIdempotentRenameRepositoryReadLockStrategy extends ServiceSuppo public void releaseExclusiveReadLockOnRollback( GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { - String key = asKey(file); + String key = asKey(exchange, file); if (removeOnRollback) { idempotentRepository.remove(exchange, key); } else { @@ -128,7 +129,7 @@ public class FileIdempotentRenameRepositoryReadLockStrategy extends ServiceSuppo public void releaseExclusiveReadLockOnCommit( GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { - String key = asKey(file); + String key = asKey(exchange, file); if (removeOnCommit) { idempotentRepository.remove(exchange, key); } else { @@ -225,12 +226,12 @@ public class FileIdempotentRenameRepositoryReadLockStrategy extends ServiceSuppo this.removeOnCommit = removeOnCommit; } - protected String asKey(GenericFile<File> file) { + protected String asKey(Exchange exchange, GenericFile<File> file) { // use absolute file path as default key, but evaluate if an expression // key was configured String key = file.getAbsoluteFilePath(); if (endpoint.getIdempotentKey() != null) { - Exchange dummy = endpoint.createExchange(file); + Exchange dummy = GenericFileHelper.createDummy(endpoint, exchange, () -> file); key = endpoint.getIdempotentKey().evaluate(dummy, String.class); } return key; diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java index adb4098b764..abbf18f8361 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java @@ -27,6 +27,7 @@ import org.apache.camel.LoggingLevel; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileEndpoint; import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy; +import org.apache.camel.component.file.GenericFileHelper; import org.apache.camel.component.file.GenericFileOperations; import org.apache.camel.spi.CamelLogger; import org.apache.camel.spi.IdempotentRepository; @@ -74,7 +75,7 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport } // check if we can begin on this file - String key = asKey(file); + String key = asKey(exchange, file); boolean answer = false; try { answer = idempotentRepository.add(exchange, key); @@ -101,7 +102,7 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport public void releaseExclusiveReadLockOnRollback( GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { - String key = asKey(file); + String key = asKey(exchange, file); Runnable r = () -> { if (removeOnRollback) { idempotentRepository.remove(exchange, key); @@ -118,7 +119,7 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport public void releaseExclusiveReadLockOnCommit( GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { - String key = asKey(file); + String key = asKey(exchange, file); Runnable r = () -> { if (removeOnCommit) { idempotentRepository.remove(exchange, key); @@ -274,12 +275,12 @@ public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport this.readLockIdempotentReleaseExecutorService = readLockIdempotentReleaseExecutorService; } - protected String asKey(GenericFile<File> file) { + protected String asKey(Exchange exchange, GenericFile<File> file) { // use absolute file path as default key, but evaluate if an expression // key was configured String key = file.getAbsoluteFilePath(); if (endpoint.getIdempotentKey() != null) { - Exchange dummy = endpoint.createExchange(file); + Exchange dummy = GenericFileHelper.createDummy(endpoint, exchange, () -> file); key = endpoint.getIdempotentKey().evaluate(dummy, String.class); } return key;
