Updated Branches: refs/heads/camel-2.11.x 27d9449f4 -> 5807b8a0e
CAMEL-6355: File consumer Check in progress eager as well to avoid any very slight chance for duplicate pickup Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5807b8a0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5807b8a0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5807b8a0 Branch: refs/heads/camel-2.11.x Commit: 5807b8a0eda296cf65de18e6d98b82d319b89aaf Parents: 27d9449 Author: Claus Ibsen <[email protected]> Authored: Mon May 13 09:23:16 2013 +0200 Committer: Claus Ibsen <[email protected]> Committed: Mon May 13 09:23:52 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/component/file/FileConsumer.java | 16 ++++---------- .../camel/component/file/GenericFileConsumer.java | 12 ++++++++++- .../camel/component/file/remote/FtpConsumer.java | 12 +++------- .../camel/component/file/remote/SftpConsumer.java | 14 +++--------- 4 files changed, 24 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5807b8a0/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java index 0b312f5..3d55e70 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java @@ -83,7 +83,7 @@ public class FileConsumer extends GenericFileConsumer<File> { GenericFile<File> gf = asGenericFile(endpointPath, file, getEndpoint().getCharset()); if (file.isDirectory()) { - if (endpoint.isRecursive() && isValidFile(gf, true, files) && depth < endpoint.getMaxDepth()) { + if (endpoint.isRecursive() && depth < endpoint.getMaxDepth() && isValidFile(gf, true, files)) { // recursive scan and add the sub files and folders String subDirectory = fileName + File.separator + file.getName(); boolean canPollMore = pollDirectory(subDirectory, fileList, depth); @@ -93,16 +93,10 @@ public class FileConsumer extends GenericFileConsumer<File> { } } else { // Windows can report false to a file on a share so regard it always as a file (if its not a directory) - if (isValidFile(gf, false, files) && depth >= endpoint.minDepth) { - if (isInProgress(gf)) { - if (log.isTraceEnabled()) { - log.trace("Skipping as file is already in progress: {}", gf.getFileName()); - } - } else { - log.trace("Adding valid file: {}", file); - // matched file so add - fileList.add(gf); - } + if (depth >= endpoint.minDepth && isValidFile(gf, false, files)) { + log.trace("Adding valid file: {}", file); + // matched file so add + fileList.add(gf); } } http://git-wip-us.apache.org/repos/asf/camel/blob/5807b8a0/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java index 7240834..7317fe6 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java @@ -419,7 +419,17 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum if (!isMatched(file, isDirectory, files)) { log.trace("File did not match. Will skip this file: {}", file); return false; - } else if (endpoint.isIdempotent()) { + } + + // if its a file then check if its already in progress + if (!isDirectory && isInProgress(file)) { + if (log.isTraceEnabled()) { + log.trace("Skipping as file is already in progress: {}", file.getFileName()); + } + return false; + } + + if (endpoint.isIdempotent()) { // use absolute file path as default key, but evaluate if an expression key was configured String key = file.getAbsoluteFilePath(); if (endpoint.getIdempotentKey() != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/5807b8a0/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java index e8b9ab2..7b042ea 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java @@ -112,7 +112,7 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> { if (file.isDirectory()) { RemoteFile<FTPFile> remote = asRemoteFile(absolutePath, file); - if (endpoint.isRecursive() && isValidFile(remote, true, files) && depth < endpoint.getMaxDepth()) { + if (endpoint.isRecursive() && depth < endpoint.getMaxDepth() && isValidFile(remote, true, files)) { // recursive scan and add the sub files and folders String subDirectory = file.getName(); String path = absolutePath + "/" + subDirectory; @@ -123,13 +123,9 @@ public class FtpConsumer extends RemoteFileConsumer<FTPFile> { } } else if (file.isFile()) { RemoteFile<FTPFile> remote = asRemoteFile(absolutePath, file); - if (isValidFile(remote, false, files) && depth >= endpoint.getMinDepth()) { - if (isInProgress(remote)) { - log.trace("Skipping as file is already in progress: {}", remote.getFileName()); - } else { - // matched file so add - fileList.add(remote); - } + if (depth >= endpoint.getMinDepth() && isValidFile(remote, false, files)) { + // matched file so add + fileList.add(remote); } } else { log.debug("Ignoring unsupported remote file type: " + file); http://git-wip-us.apache.org/repos/asf/camel/blob/5807b8a0/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java index 025a8e0..1dbfd9d 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java @@ -109,7 +109,7 @@ public class SftpConsumer extends RemoteFileConsumer<ChannelSftp.LsEntry> { if (file.getAttrs().isDir()) { RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(absolutePath, file); - if (endpoint.isRecursive() && isValidFile(remote, true, files) && depth < endpoint.getMaxDepth()) { + if (endpoint.isRecursive() && depth < endpoint.getMaxDepth() && isValidFile(remote, true, files)) { // recursive scan and add the sub files and folders String subDirectory = file.getFilename(); String path = absolutePath + "/" + subDirectory; @@ -122,15 +122,9 @@ public class SftpConsumer extends RemoteFileConsumer<ChannelSftp.LsEntry> { // just assuming its a file we should poll } else { RemoteFile<ChannelSftp.LsEntry> remote = asRemoteFile(absolutePath, file); - if (isValidFile(remote, false, files) && depth >= endpoint.getMinDepth()) { - if (isInProgress(remote)) { - if (log.isTraceEnabled()) { - log.trace("Skipping as file is already in progress: {}", remote.getFileName()); - } - } else { - // matched file so add - fileList.add(remote); - } + if (depth >= endpoint.getMinDepth() && isValidFile(remote, false, files)) { + // matched file so add + fileList.add(remote); } } }
