Updated Branches:
refs/heads/camel-2.11.x e348badd8 -> 8ea81d59b
CAMEL-6936: Fixed file/ftp consumer when idempotent=true may not detect file
changed as new file, due the file was regarded as still in-progress.
Conflicts:
camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8ea81d59
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8ea81d59
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8ea81d59
Branch: refs/heads/camel-2.11.x
Commit: 8ea81d59b7cb6c426af385f4d0555b1597d899a9
Parents: e348bad
Author: Claus Ibsen <[email protected]>
Authored: Mon Nov 11 12:19:24 2013 +0100
Committer: Claus Ibsen <[email protected]>
Committed: Mon Nov 11 12:32:03 2013 +0100
----------------------------------------------------------------------
.../component/file/GenericFileConsumer.java | 31 +++++++++++++-------
.../file/FileConsumerIdempotentTest.java | 13 ++++++--
2 files changed, 32 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8ea81d59/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
----------------------------------------------------------------------
diff --git
a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 7317fe6..9e297f5 100644
---
a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -429,21 +429,32 @@ public abstract class GenericFileConsumer<T> extends
ScheduledBatchPollingConsum
return false;
}
- if (endpoint.isIdempotent()) {
- // use absolute file path as default key, but evaluate if an
expression key was configured
- String key = file.getAbsoluteFilePath();
- if (endpoint.getIdempotentKey() != null) {
- Exchange dummy = endpoint.createExchange(file);
- key = endpoint.getIdempotentKey().evaluate(dummy,
String.class);
+ boolean answer = true;
+ String key = null;
+ try {
+ // if its a file then check we have the file in the idempotent
registry already
+ if (endpoint.isIdempotent()) {
+ // use absolute file path as default key, but evaluate if an
expression key was configured
+ key = file.getAbsoluteFilePath();
+ if (endpoint.getIdempotentKey() != null) {
+ Exchange dummy = endpoint.createExchange(file);
+ key = endpoint.getIdempotentKey().evaluate(dummy,
String.class);
+ }
+ if (key != null &&
endpoint.getIdempotentRepository().contains(key)) {
+ log.trace("This consumer is idempotent and the file has
been consumed before. Will skip this file: {}", file);
+ answer = false;
+ }
}
- if (key != null &&
endpoint.getIdempotentRepository().contains(key)) {
- log.trace("This consumer is idempotent and the file has been
consumed before. Will skip this file: {}", file);
- return false;
+ } finally {
+ // ensure to run this in finally block in case of runtime
exceptions being thrown
+ if (!answer) {
+ // remove file from the in progress list as its no longer in
progress
+ endpoint.getInProgressRepository().remove(key);
}
}
// file matched
- return true;
+ return answer;
}
/**
http://git-wip-us.apache.org/repos/asf/camel/blob/8ea81d59/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
----------------------------------------------------------------------
diff --git
a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
index 4d1849a..7ae8b73 100644
---
a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
+++
b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerIdempotentTest.java
@@ -22,12 +22,15 @@ import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
/**
* Unit test for the idempotent=true option.
*/
public class FileConsumerIdempotentTest extends ContextTestSupport {
+ private String uri =
"file://target/idempotent/?idempotent=true&move=done/${file:name}&delay=10";
+
@Override
protected void setUp() throws Exception {
deleteDirectory("target/idempotent");
@@ -39,8 +42,8 @@ public class FileConsumerIdempotentTest extends
ContextTestSupport {
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
-
from("file://target/idempotent/?idempotent=true&move=done/${file:name}&delay=10")
- .convertBodyTo(String.class).to("mock:result");
+ from(uri)
+ .convertBodyTo(String.class).to("mock:result");
}
};
}
@@ -66,6 +69,12 @@ public class FileConsumerIdempotentTest extends
ContextTestSupport {
// should NOT consume the file again, let a bit time pass to let the
consumer try to consume it but it should not
Thread.sleep(100);
assertMockEndpointsSatisfied();
+
+ FileEndpoint fe = context.getEndpoint(uri, FileEndpoint.class);
+ assertNotNull(fe);
+
+ MemoryIdempotentRepository repo = (MemoryIdempotentRepository)
fe.getInProgressRepository();
+ assertEquals("Should be no in-progress files", 0, repo.getCacheSize());
}
}
\ No newline at end of file