Updated Branches:
  refs/heads/camel-2.12.x dbf57fcb8 -> cebaa3692
  refs/heads/master e32f672e5 -> eceb3fca7


CAMEL-6936: Fixed file/ftp consumer when idempotent=true may not detect file 
changed as new file, due the file was regarded as still in-progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4954d573
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4954d573
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4954d573

Branch: refs/heads/master
Commit: 4954d573c63a0eff222ea021974aeb39058a8cb8
Parents: e32f672
Author: Claus Ibsen <[email protected]>
Authored: Mon Nov 11 12:19:24 2013 +0100
Committer: Claus Ibsen <[email protected]>
Committed: Mon Nov 11 12:19:24 2013 +0100

----------------------------------------------------------------------
 .../component/file/GenericFileConsumer.java     | 33 +++++++++++++-------
 .../file/FileConsumerIdempotentTest.java        | 13 ++++++--
 2 files changed, 33 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4954d573/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index c8452fd..02130d2 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -484,22 +484,32 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
             return false;
         }
 
-        // if its a file then check we have the file in the idempotent 
registry already
-        if (!isDirectory && endpoint.isIdempotent()) {
-            // use absolute file path as default key, but evaluate if an 
expression key was configured
-            String key = file.getAbsoluteFilePath();
-            if (endpoint.getIdempotentKey() != null) {
-                Exchange dummy = endpoint.createExchange(file);
-                key = endpoint.getIdempotentKey().evaluate(dummy, 
String.class);
+        boolean answer = true;
+        String key = null;
+        try {
+            // if its a file then check we have the file in the idempotent 
registry already
+            if (!isDirectory && endpoint.isIdempotent()) {
+                // use absolute file path as default key, but evaluate if an 
expression key was configured
+                key = file.getAbsoluteFilePath();
+                if (endpoint.getIdempotentKey() != null) {
+                    Exchange dummy = endpoint.createExchange(file);
+                    key = endpoint.getIdempotentKey().evaluate(dummy, 
String.class);
+                }
+                if (key != null && 
endpoint.getIdempotentRepository().contains(key)) {
+                    log.trace("This consumer is idempotent and the file has 
been consumed before. Will skip this file: {}", file);
+                    answer = false;
+                }
             }
-            if (key != null && 
endpoint.getIdempotentRepository().contains(key)) {
-                log.trace("This consumer is idempotent and the file has been 
consumed before. Will skip this file: {}", file);
-                return false;
+        } finally {
+            // ensure to run this in finally block in case of runtime 
exceptions being thrown
+            if (!answer) {
+                // remove file from the in progress list as its no longer in 
progress
+                endpoint.getInProgressRepository().remove(key);
             }
         }
 
         // file matched
-        return true;
+        return answer;
     }
 
     /**
@@ -607,6 +617,7 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
      */
     protected boolean isInProgress(GenericFile<T> file) {
         String key = file.getAbsoluteFilePath();
+        // must use add, to have operation as atomic
         return !endpoint.getInProgressRepository().add(key);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/4954d573/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
index 4d1849a..7ae8b73 100644
--- 
a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
@@ -22,12 +22,15 @@ import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
 
 /**
  * Unit test for the idempotent=true option.
  */
 public class FileConsumerIdempotentTest extends ContextTestSupport {
 
+    private String uri = 
"file://target/idempotent/?idempotent=true&move=done/${file:name}&delay=10";
+
     @Override
     protected void setUp() throws Exception {
         deleteDirectory("target/idempotent");
@@ -39,8 +42,8 @@ public class FileConsumerIdempotentTest extends 
ContextTestSupport {
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                
from("file://target/idempotent/?idempotent=true&move=done/${file:name}&delay=10")
-                        .convertBodyTo(String.class).to("mock:result");
+                from(uri)
+                    .convertBodyTo(String.class).to("mock:result");
             }
         };
     }
@@ -66,6 +69,12 @@ public class FileConsumerIdempotentTest extends 
ContextTestSupport {
         // should NOT consume the file again, let a bit time pass to let the 
consumer try to consume it but it should not
         Thread.sleep(100);
         assertMockEndpointsSatisfied();
+
+        FileEndpoint fe = context.getEndpoint(uri, FileEndpoint.class);
+        assertNotNull(fe);
+
+        MemoryIdempotentRepository repo = (MemoryIdempotentRepository) 
fe.getInProgressRepository();
+        assertEquals("Should be no in-progress files", 0, repo.getCacheSize());
     }
 
 }
\ No newline at end of file

Reply via email to