Updated Branches:
  refs/heads/master 73b614da5 -> bce6ea8e1

CAMEL-6355: File consumer Check in progress eager as well to avoid any very 
slight chance for duplicate pickup


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bce6ea8e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bce6ea8e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bce6ea8e

Branch: refs/heads/master
Commit: bce6ea8e190df0f3fa93270d701a9c960288918c
Parents: 73b614d
Author: Claus Ibsen <[email protected]>
Authored: Mon May 13 09:23:16 2013 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Mon May 13 09:23:16 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/file/FileConsumer.java  |   16 ++++----------
 .../camel/component/file/GenericFileConsumer.java  |   12 ++++++++++-
 .../camel/component/file/remote/FtpConsumer.java   |   12 +++-------
 .../camel/component/file/remote/SftpConsumer.java  |   14 +++---------
 4 files changed, 24 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bce6ea8e/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java 
b/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 0b312f5..3d55e70 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -83,7 +83,7 @@ public class FileConsumer extends GenericFileConsumer<File> {
             GenericFile<File> gf = asGenericFile(endpointPath, file, 
getEndpoint().getCharset());
 
             if (file.isDirectory()) {
-                if (endpoint.isRecursive() && isValidFile(gf, true, files) && 
depth < endpoint.getMaxDepth()) {
+                if (endpoint.isRecursive() && depth < endpoint.getMaxDepth() 
&& isValidFile(gf, true, files)) {
                     // recursive scan and add the sub files and folders
                     String subDirectory = fileName + File.separator + 
file.getName();
                     boolean canPollMore = pollDirectory(subDirectory, 
fileList, depth);
@@ -93,16 +93,10 @@ public class FileConsumer extends GenericFileConsumer<File> 
{
                 }
             } else {
                 // Windows can report false to a file on a share so regard it 
always as a file (if its not a directory)
-                if (isValidFile(gf, false, files) && depth >= 
endpoint.minDepth) {
-                    if (isInProgress(gf)) {
-                        if (log.isTraceEnabled()) {
-                            log.trace("Skipping as file is already in 
progress: {}", gf.getFileName());
-                        }
-                    } else {
-                        log.trace("Adding valid file: {}", file);
-                        // matched file so add
-                        fileList.add(gf);
-                    }
+                if (depth >= endpoint.minDepth && isValidFile(gf, false, 
files)) {
+                    log.trace("Adding valid file: {}", file);
+                    // matched file so add
+                    fileList.add(gf);
                 }
 
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/bce6ea8e/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
 
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 4e60e29..6cb3262 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -421,7 +421,17 @@ public abstract class GenericFileConsumer<T> extends 
ScheduledBatchPollingConsum
         if (!isMatched(file, isDirectory, files)) {
             log.trace("File did not match. Will skip this file: {}", file);
             return false;
-        } else if (endpoint.isIdempotent()) {
+        }
+
+        // if its a file then check if its already in progress
+        if (!isDirectory && isInProgress(file)) {
+            if (log.isTraceEnabled()) {
+                log.trace("Skipping as file is already in progress: {}", 
file.getFileName());
+            }
+            return false;
+        }
+
+        if (endpoint.isIdempotent()) {
             // use absolute file path as default key, but evaluate if an 
expression key was configured
             String key = file.getAbsoluteFilePath();
             if (endpoint.getIdempotentKey() != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/bce6ea8e/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
index e8b9ab2..7b042ea 100644
--- 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
+++ 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
@@ -112,7 +112,7 @@ public class FtpConsumer extends 
RemoteFileConsumer<FTPFile> {
 
             if (file.isDirectory()) {
                 RemoteFile<FTPFile> remote = asRemoteFile(absolutePath, file);
-                if (endpoint.isRecursive() && isValidFile(remote, true, files) 
&& depth < endpoint.getMaxDepth()) {
+                if (endpoint.isRecursive() && depth < endpoint.getMaxDepth() 
&& isValidFile(remote, true, files)) {
                     // recursive scan and add the sub files and folders
                     String subDirectory = file.getName();
                     String path = absolutePath + "/" + subDirectory;
@@ -123,13 +123,9 @@ public class FtpConsumer extends 
RemoteFileConsumer<FTPFile> {
                 }
             } else if (file.isFile()) {
                 RemoteFile<FTPFile> remote = asRemoteFile(absolutePath, file);
-                if (isValidFile(remote, false, files) && depth >= 
endpoint.getMinDepth()) {
-                    if (isInProgress(remote)) {
-                        log.trace("Skipping as file is already in progress: 
{}", remote.getFileName());
-                    } else {
-                        // matched file so add
-                        fileList.add(remote);
-                    }
+                if (depth >= endpoint.getMinDepth() && isValidFile(remote, 
false, files)) {
+                    // matched file so add
+                    fileList.add(remote);
                 }
             } else {
                 log.debug("Ignoring unsupported remote file type: " + file);

http://git-wip-us.apache.org/repos/asf/camel/blob/bce6ea8e/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
index 025a8e0..1dbfd9d 100644
--- 
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
+++ 
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
@@ -109,7 +109,7 @@ public class SftpConsumer extends 
RemoteFileConsumer<ChannelSftp.LsEntry> {
 
             if (file.getAttrs().isDir()) {
                 RemoteFile<ChannelSftp.LsEntry> remote = 
asRemoteFile(absolutePath, file);
-                if (endpoint.isRecursive() && isValidFile(remote, true, files) 
&& depth < endpoint.getMaxDepth()) {
+                if (endpoint.isRecursive() && depth < endpoint.getMaxDepth() 
&& isValidFile(remote, true, files)) {
                     // recursive scan and add the sub files and folders
                     String subDirectory = file.getFilename();
                     String path = absolutePath + "/" + subDirectory;
@@ -122,15 +122,9 @@ public class SftpConsumer extends 
RemoteFileConsumer<ChannelSftp.LsEntry> {
                 // just assuming its a file we should poll
             } else {
                 RemoteFile<ChannelSftp.LsEntry> remote = 
asRemoteFile(absolutePath, file);
-                if (isValidFile(remote, false, files) && depth >= 
endpoint.getMinDepth()) {
-                    if (isInProgress(remote)) {
-                        if (log.isTraceEnabled()) {
-                            log.trace("Skipping as file is already in 
progress: {}", remote.getFileName());
-                        }
-                    } else {
-                        // matched file so add
-                        fileList.add(remote);
-                    }
+                if (depth >= endpoint.getMinDepth() && isValidFile(remote, 
false, files)) {
+                    // matched file so add
+                    fileList.add(remote);
                 }
             }
         }

Reply via email to