Updated Branches: refs/heads/camel-2.12.x dbf57fcb8 -> cebaa3692 refs/heads/master e32f672e5 -> eceb3fca7
CAMEL-6936: Fixed file/ftp consumer when idempotent=true may not detect file changed as new file, due the file was regarded as still in-progress. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4954d573 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4954d573 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4954d573 Branch: refs/heads/master Commit: 4954d573c63a0eff222ea021974aeb39058a8cb8 Parents: e32f672 Author: Claus Ibsen <[email protected]> Authored: Mon Nov 11 12:19:24 2013 +0100 Committer: Claus Ibsen <[email protected]> Committed: Mon Nov 11 12:19:24 2013 +0100 ---------------------------------------------------------------------- .../component/file/GenericFileConsumer.java | 33 +++++++++++++------- .../file/FileConsumerIdempotentTest.java | 13 ++++++-- 2 files changed, 33 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4954d573/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java index c8452fd..02130d2 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java @@ -484,22 +484,32 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum return false; } - // if its a file then check we have the file in the idempotent registry already - if (!isDirectory && endpoint.isIdempotent()) { - // use absolute file path as default key, but evaluate if an expression key was configured - String key = file.getAbsoluteFilePath(); - if (endpoint.getIdempotentKey() != null) { - Exchange dummy = endpoint.createExchange(file); - key = endpoint.getIdempotentKey().evaluate(dummy, String.class); + boolean answer = true; + String key = null; + try { + // if its a file then check we have the file in the idempotent registry already + if (!isDirectory && endpoint.isIdempotent()) { + // use absolute file path as default key, but evaluate if an expression key was configured + key = file.getAbsoluteFilePath(); + if (endpoint.getIdempotentKey() != null) { + Exchange dummy = endpoint.createExchange(file); + key = endpoint.getIdempotentKey().evaluate(dummy, String.class); + } + if (key != null && endpoint.getIdempotentRepository().contains(key)) { + log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: {}", file); + answer = false; + } } - if (key != null && endpoint.getIdempotentRepository().contains(key)) { - log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: {}", file); - return false; + } finally { + // ensure to run this in finally block in case of runtime exceptions being thrown + if (!answer) { + // remove file from the in progress list as its no longer in progress + endpoint.getInProgressRepository().remove(key); } } // file matched - return true; + return answer; } /** @@ -607,6 +617,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum */ protected boolean isInProgress(GenericFile<T> file) { String key = file.getAbsoluteFilePath(); + // must use add, to have operation as atomic return !endpoint.getInProgressRepository().add(key); } http://git-wip-us.apache.org/repos/asf/camel/blob/4954d573/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java index 4d1849a..7ae8b73 100644 --- a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java @@ -22,12 +22,15 @@ import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.idempotent.MemoryIdempotentRepository; /** * Unit test for the idempotent=true option. */ public class FileConsumerIdempotentTest extends ContextTestSupport { + private String uri = "file://target/idempotent/?idempotent=true&move=done/${file:name}&delay=10"; + @Override protected void setUp() throws Exception { deleteDirectory("target/idempotent"); @@ -39,8 +42,8 @@ public class FileConsumerIdempotentTest extends ContextTestSupport { protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { - from("file://target/idempotent/?idempotent=true&move=done/${file:name}&delay=10") - .convertBodyTo(String.class).to("mock:result"); + from(uri) + .convertBodyTo(String.class).to("mock:result"); } }; } @@ -66,6 +69,12 @@ public class FileConsumerIdempotentTest extends ContextTestSupport { // should NOT consume the file again, let a bit time pass to let the consumer try to consume it but it should not Thread.sleep(100); assertMockEndpointsSatisfied(); + + FileEndpoint fe = context.getEndpoint(uri, FileEndpoint.class); + assertNotNull(fe); + + MemoryIdempotentRepository repo = (MemoryIdempotentRepository) fe.getInProgressRepository(); + assertEquals("Should be no in-progress files", 0, repo.getCacheSize()); } } \ No newline at end of file
