Updated Branches:
  refs/heads/camel-2.11.x e348badd8 -> 8ea81d59b

CAMEL-6936: Fixed file/ftp consumer when idempotent=true may not detect file 
changed as new file, due the file was regarded as still in-progress.

Conflicts:
        
camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8ea81d59
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8ea81d59
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8ea81d59

Branch: refs/heads/camel-2.11.x
Commit: 8ea81d59b7cb6c426af385f4d0555b1597d899a9
Parents: e348bad
Author: Claus Ibsen <[email protected]>
Authored: Mon Nov 11 12:19:24 2013 +0100
Committer: Claus Ibsen <[email protected]>
Committed: Mon Nov 11 12:32:03 2013 +0100

----------------------------------------------------------------------
 .../component/file/GenericFileConsumer.java     | 31 +++++++++++++-------
 .../file/FileConsumerIdempotentTest.java        | 13 ++++++--
 2 files changed, 32 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8ea81d59/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 7317fe6..9e297f5 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -429,21 +429,32 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
             return false;
         }
 
-        if (endpoint.isIdempotent()) {
-            // use absolute file path as default key, but evaluate if an 
expression key was configured
-            String key = file.getAbsoluteFilePath();
-            if (endpoint.getIdempotentKey() != null) {
-                Exchange dummy = endpoint.createExchange(file);
-                key = endpoint.getIdempotentKey().evaluate(dummy, 
String.class);
+        boolean answer = true;
+        String key = null;
+        try {
+            // if its a file then check we have the file in the idempotent 
registry already
+            if (endpoint.isIdempotent()) {
+                // use absolute file path as default key, but evaluate if an 
expression key was configured
+                key = file.getAbsoluteFilePath();
+                if (endpoint.getIdempotentKey() != null) {
+                    Exchange dummy = endpoint.createExchange(file);
+                    key = endpoint.getIdempotentKey().evaluate(dummy, 
String.class);
+                }
+                if (key != null && 
endpoint.getIdempotentRepository().contains(key)) {
+                    log.trace("This consumer is idempotent and the file has 
been consumed before. Will skip this file: {}", file);
+                    answer = false;
+                }
             }
-            if (key != null && 
endpoint.getIdempotentRepository().contains(key)) {
-                log.trace("This consumer is idempotent and the file has been 
consumed before. Will skip this file: {}", file);
-                return false;
+        } finally {
+            // ensure to run this in finally block in case of runtime 
exceptions being thrown
+            if (!answer) {
+                // remove file from the in progress list as its no longer in 
progress
+                endpoint.getInProgressRepository().remove(key);
             }
         }
 
         // file matched
-        return true;
+        return answer;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/8ea81d59/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
index 4d1849a..7ae8b73 100644
--- 
a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
@@ -22,12 +22,15 @@ import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
 
 /**
  * Unit test for the idempotent=true option.
  */
 public class FileConsumerIdempotentTest extends ContextTestSupport {
 
+    private String uri = 
"file://target/idempotent/?idempotent=true&move=done/${file:name}&delay=10";
+
     @Override
     protected void setUp() throws Exception {
         deleteDirectory("target/idempotent");
@@ -39,8 +42,8 @@ public class FileConsumerIdempotentTest extends 
ContextTestSupport {
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                
from("file://target/idempotent/?idempotent=true&move=done/${file:name}&delay=10")
-                        .convertBodyTo(String.class).to("mock:result");
+                from(uri)
+                    .convertBodyTo(String.class).to("mock:result");
             }
         };
     }
@@ -66,6 +69,12 @@ public class FileConsumerIdempotentTest extends 
ContextTestSupport {
         // should NOT consume the file again, let a bit time pass to let the 
consumer try to consume it but it should not
         Thread.sleep(100);
         assertMockEndpointsSatisfied();
+
+        FileEndpoint fe = context.getEndpoint(uri, FileEndpoint.class);
+        assertNotNull(fe);
+
+        MemoryIdempotentRepository repo = (MemoryIdempotentRepository) 
fe.getInProgressRepository();
+        assertEquals("Should be no in-progress files", 0, repo.getCacheSize());
     }
 
 }
\ No newline at end of file

Reply via email to