This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 0b0125b8b7f (chores) camel-file: code cleanup 0b0125b8b7f is described below commit 0b0125b8b7f62b3ade04b0032c166a1bb4cd3c27 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Wed May 8 08:18:30 2024 +0200 (chores) camel-file: code cleanup - break large and complex methods --- .../apache/camel/component/file/FileConsumer.java | 15 +- .../apache/camel/component/file/FileEndpoint.java | 21 +- .../camel/component/file/FileOperations.java | 184 ++++++---- .../camel/component/file/GenericFileConsumer.java | 156 +++++---- .../camel/component/file/GenericFileProducer.java | 234 +++++++------ .../file/GenericFileSendDynamicAware.java | 42 +-- .../FileChangedExclusiveReadLockStrategy.java | 7 +- .../FileLockExclusiveReadLockStrategy.java | 32 +- .../file/strategy/FileProcessStrategyFactory.java | 371 ++++++++++++--------- .../MarkerFileExclusiveReadLockStrategy.java | 13 + 10 files changed, 624 insertions(+), 451 deletions(-) diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java index 5d9cebd645a..1d0c768dd82 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java @@ -87,10 +87,18 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa Arrays.sort(files, Comparator.comparing(File::getAbsoluteFile)); } + if (processPolledFiles(fileList, depth, files)) { + return false; + } + + return true; + } + + private boolean processPolledFiles(List<GenericFile<File>> fileList, int depth, File[] files) { for (File file : files) { // check if we can continue polling in files if (!canPollMoreFiles(fileList)) { - return false; + return true; } // trace log as Windows/Unix can have different views what the file is @@ -117,11 +125,10 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa } if (processEntry(fileList, depth, file, gf, files)) { - return false; + return true; } } - - return true; + return false; } private boolean processEntry(List<GenericFile<File>> fileList, int depth, File file, GenericFile<File> gf, File[] files) { diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java index eb07f3d5e02..330925f5873 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java @@ -104,14 +104,8 @@ public class FileEndpoint extends GenericFileEndpoint<File> { if (!file.exists() && !file.isDirectory()) { tryCreateDirectory(); } - if (!isStartingDirectoryMustExist() && isStartingDirectoryMustHaveAccess()) { - throw new IllegalArgumentException( - "You cannot set startingDirectoryMustHaveAccess=true without setting startingDirectoryMustExist=true"); - } else if (isStartingDirectoryMustExist() && isStartingDirectoryMustHaveAccess()) { - if (!file.canRead() || !file.canWrite()) { - throw new IOException("Starting directory permission denied: " + file); - } - } + tryReadingStartDirectory(); + FileConsumer result = newFileConsumer(processor, operations); if (isDelete() && getMove() != null) { @@ -142,6 +136,17 @@ public class FileEndpoint extends GenericFileEndpoint<File> { return result; } + private void tryReadingStartDirectory() throws IOException { + if (!isStartingDirectoryMustExist() && isStartingDirectoryMustHaveAccess()) { + throw new IllegalArgumentException( + "You cannot set startingDirectoryMustHaveAccess=true without setting startingDirectoryMustExist=true"); + } else if (isStartingDirectoryMustExist() && isStartingDirectoryMustHaveAccess()) { + if (!file.canRead() || !file.canWrite()) { + throw new IOException("Starting directory permission denied: " + file); + } + } + } + private void readLockCheck() { // check if its a valid String valid = "none,markerFile,fileLock,rename,changed,idempotent,idempotent-changed,idempotent-rename"; diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileOperations.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileOperations.java index 69922bcb574..a4efa079cbf 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileOperations.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileOperations.java @@ -249,31 +249,14 @@ public class FileOperations implements GenericFileOperations<File> { // if an existing file already exists what should we do? if (file.exists()) { - if (endpoint.getFileExist() == GenericFileExist.Ignore) { - // ignore but indicate that the file was written - LOG.trace("An existing file already exists: {}. Ignore and do not override it.", file); + if (applyExistingFilePolicy(fileName, file)) { return true; - } else if (endpoint.getFileExist() == GenericFileExist.Fail) { - throw new GenericFileOperationFailedException("File already exist: " + file + ". Cannot write new file."); - } else if (endpoint.getFileExist() == GenericFileExist.Move) { - // move any existing file first - this.endpoint.getMoveExistingFileStrategy().moveExistingFile(endpoint, this, fileName); } } // Do an explicit test for a null body and decide what to do if (exchange.getIn().getBody() == null) { - if (endpoint.isAllowNullBody()) { - LOG.trace("Writing empty file."); - try { - writeFileEmptyBody(file); - return true; - } catch (IOException e) { - throw new GenericFileOperationFailedException("Cannot store file: " + file, e); - } - } else { - throw new GenericFileOperationFailedException("Cannot write null body to file: " + file); - } + return handleNullBody(file); } // we can write the file by 3 different techniques @@ -294,18 +277,7 @@ public class FileOperations implements GenericFileOperations<File> { if (charset == null && endpoint.getFileExist() != GenericFileExist.Append) { // if no charset and not in appending mode, then we can try // using file directly (optimized) - Object body = exchange.getIn().getBody(); - if (body instanceof WrappedFile<?> wrapped) { - body = wrapped.getFile(); - if (!(body instanceof File)) { - // the wrapped file may be from remote (FTP) which then can store - // a local java.io.File handle if storing to local work-dir so check for that - Object maybeFile = wrapped.getBody(); - if (maybeFile instanceof File) { - body = maybeFile; - } - } - } + final Object body = extractBodyFromExchange(exchange); if (body instanceof File) { source = (File) body; fileBased = true; @@ -320,34 +292,7 @@ public class FileOperations implements GenericFileOperations<File> { // a full file to file copy, as the local work copy is to be // deleted afterwards anyway // local work path - String local = exchange.getIn().getHeader(FileConstants.FILE_LOCAL_WORK_PATH, String.class); - if (local != null) { - File f = new File(local); - if (f.exists()) { - boolean renamed = writeFileByLocalWorkPath(f, file); - if (renamed) { - // try to keep last modified timestamp if configured to - // do so - keepLastModified(exchange, file); - // set permissions if the chmod option was set - setPermissions(file); - // clear header as we have renamed the file - exchange.getIn().setHeader(FileConstants.FILE_LOCAL_WORK_PATH, null); - // return as the operation is complete, we just renamed - // the local work file - // to the target. - return true; - } - } - } else if (source != null && source.exists()) { - // no there is no local work file so use file to file copy - // if the source exists - writeFileByFile(source, file, exchange); - // try to keep last modified timestamp if configured to do - // so - keepLastModified(exchange, file); - // set permissions if the chmod option was set - setPermissions(file); + if (handleFileAsFileSource(exchange, file, source)) { return true; } } @@ -355,25 +300,13 @@ public class FileOperations implements GenericFileOperations<File> { if (charset != null) { // charset configured so we must use a reader so we can write // with encoding - Reader in = exchange.getContext().getTypeConverter().tryConvertTo(Reader.class, exchange, - exchange.getIn().getBody()); - if (in == null) { - // okay no direct reader conversion, so use an input stream - // (which a lot can be converted as) - InputStream is = exchange.getIn().getMandatoryBody(InputStream.class); - in = new InputStreamReader(is); - } - // buffer the reader - in = IOHelper.buffered(in); - writeFileByReaderWithCharset(in, file, charset); + handleReaderAsFileSource(exchange, file, charset); } else if (exchange.getIn().getBody() instanceof String) { // If the body is a string, write it directly - String stringBody = (String) exchange.getIn().getBody(); - writeFileByString(stringBody, file); + handleStringAsFileSource(exchange, file); } else { // fallback and use stream based - InputStream in = exchange.getIn().getMandatoryBody(InputStream.class); - writeFileByStream(in, file); + handleStreamAsFileSource(exchange, file); } // try to keep last modified timestamp if configured to do so @@ -387,6 +320,109 @@ public class FileOperations implements GenericFileOperations<File> { } } + private void handleStreamAsFileSource(Exchange exchange, File file) throws InvalidPayloadException, IOException { + InputStream in = exchange.getIn().getMandatoryBody(InputStream.class); + writeFileByStream(in, file); + } + + private void handleStringAsFileSource(Exchange exchange, File file) throws IOException { + String stringBody = (String) exchange.getIn().getBody(); + writeFileByString(stringBody, file); + } + + private void handleReaderAsFileSource(Exchange exchange, File file, String charset) + throws InvalidPayloadException, IOException { + Reader in = exchange.getContext().getTypeConverter().tryConvertTo(Reader.class, exchange, + exchange.getIn().getBody()); + if (in == null) { + // okay no direct reader conversion, so use an input stream + // (which a lot can be converted as) + InputStream is = exchange.getIn().getMandatoryBody(InputStream.class); + in = new InputStreamReader(is); + } + // buffer the reader + in = IOHelper.buffered(in); + writeFileByReaderWithCharset(in, file, charset); + } + + private boolean handleFileAsFileSource(Exchange exchange, File file, File source) throws IOException { + String local = exchange.getIn().getHeader(FileConstants.FILE_LOCAL_WORK_PATH, String.class); + if (local != null) { + File f = new File(local); + if (f.exists()) { + boolean renamed = writeFileByLocalWorkPath(f, file); + if (renamed) { + // try to keep last modified timestamp if configured to + // do so + keepLastModified(exchange, file); + // set permissions if the chmod option was set + setPermissions(file); + // clear header as we have renamed the file + exchange.getIn().setHeader(FileConstants.FILE_LOCAL_WORK_PATH, null); + // return as the operation is complete, we just renamed + // the local work file + // to the target. + return true; + } + } + } else if (source != null && source.exists()) { + // no there is no local work file so use file to file copy + // if the source exists + writeFileByFile(source, file, exchange); + // try to keep last modified timestamp if configured to do + // so + keepLastModified(exchange, file); + // set permissions if the chmod option was set + setPermissions(file); + return true; + } + return false; + } + + private static Object extractBodyFromExchange(Exchange exchange) { + Object body = exchange.getIn().getBody(); + if (body instanceof WrappedFile<?> wrapped) { + body = wrapped.getFile(); + if (!(body instanceof File)) { + // the wrapped file may be from remote (FTP) which then can store + // a local java.io.File handle if storing to local work-dir so check for that + Object maybeFile = wrapped.getBody(); + if (maybeFile instanceof File) { + body = maybeFile; + } + } + } + return body; + } + + private boolean handleNullBody(File file) { + if (endpoint.isAllowNullBody()) { + LOG.trace("Writing empty file."); + try { + writeFileEmptyBody(file); + return true; + } catch (IOException e) { + throw new GenericFileOperationFailedException("Cannot store file: " + file, e); + } + } else { + throw new GenericFileOperationFailedException("Cannot write null body to file: " + file); + } + } + + private boolean applyExistingFilePolicy(String fileName, File file) { + if (endpoint.getFileExist() == GenericFileExist.Ignore) { + // ignore but indicate that the file was written + LOG.trace("An existing file already exists: {}. Ignore and do not override it.", file); + return true; + } else if (endpoint.getFileExist() == GenericFileExist.Fail) { + throw new GenericFileOperationFailedException("File already exist: " + file + ". Cannot write new file."); + } else if (endpoint.getFileExist() == GenericFileExist.Move) { + // move any existing file first + this.endpoint.getMoveExistingFileStrategy().moveExistingFile(endpoint, this, fileName); + } + return false; + } + private void setPermissions(File file) throws IOException { if (ObjectHelper.isNotEmpty(endpoint.getChmod())) { Set<PosixFilePermission> permissions = endpoint.getPermissions(); diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java index 7fae79851b1..5ff664d7189 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java @@ -430,42 +430,9 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum try { if (isRetrieveFile()) { - // retrieve the file using the stream - LOG.trace("Retrieving file: {} from: {}", name, endpoint); - - // retrieve the file and check it was a success - boolean retrieved; - Exception cause = null; - try { - retrieved = operations.retrieveFile(name, exchange, target.getFileLength()); - } catch (Exception e) { - retrieved = false; - cause = e; - } - - if (!retrieved) { - if (ignoreCannotRetrieveFile(name, exchange, cause)) { - LOG.trace("Cannot retrieve file {} maybe it does not exist. Ignoring.", name); - // remove file from the in progress list as we could not - // retrieve it, but should ignore - endpoint.getInProgressRepository().remove(absoluteFileName); - return false; - } else { - // throw exception to handle the problem with retrieving - // the file - // then if the method return false or throws an - // exception is handled the same in here - // as in both cases an exception is being thrown - if (cause instanceof GenericFileOperationFailedException) { - throw cause; - } else { - throw new GenericFileOperationFailedException( - "Cannot retrieve file: " + file + " from: " + endpoint, cause); - } - } + if (tryRetrievingFile(exchange, name, target, absoluteFileName, file)) { + return false; } - - LOG.trace("Retrieved file: {} from: {}", name, endpoint); } else { LOG.trace("Skipped retrieval of file: {} from: {}", name, endpoint); exchange.getIn().setBody(null); @@ -508,6 +475,48 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum return true; } + private boolean tryRetrievingFile( + Exchange exchange, String name, GenericFile<T> target, String absoluteFileName, GenericFile<T> file) + throws Exception { + // retrieve the file using the stream + LOG.trace("Retrieving file: {} from: {}", name, endpoint); + + // retrieve the file and check it was a success + boolean retrieved; + Exception cause = null; + try { + retrieved = operations.retrieveFile(name, exchange, target.getFileLength()); + } catch (Exception e) { + retrieved = false; + cause = e; + } + + if (!retrieved) { + if (ignoreCannotRetrieveFile(name, exchange, cause)) { + LOG.trace("Cannot retrieve file {} maybe it does not exist. Ignoring.", name); + // remove file from the in progress list as we could not + // retrieve it, but should ignore + endpoint.getInProgressRepository().remove(absoluteFileName); + return true; + } else { + // throw exception to handle the problem with retrieving + // the file + // then if the method return false or throws an + // exception is handled the same in here + // as in both cases an exception is being thrown + if (cause instanceof GenericFileOperationFailedException) { + throw cause; + } else { + throw new GenericFileOperationFailedException( + "Cannot retrieve file: " + file + " from: " + endpoint, cause); + } + } + } + + LOG.trace("Retrieved file: {} from: {}", name, endpoint); + return false; + } + /** * Updates the information on {@link Message} after we have acquired read-lock and can begin process the file. * @@ -697,34 +706,8 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum return true; } - // exclude take precedence over include - if (excludePattern != null) { - if (excludePattern.matcher(name).matches()) { - return false; - } - } - if (excludeExt != null) { - String fname = file.getFileName().toLowerCase(); - for (String exclude : excludeExt) { - if (fname.endsWith("." + exclude)) { - return false; - } - } - } - if (includePattern != null) { - if (!includePattern.matcher(name).matches()) { - return false; - } - } - if (includeExt != null) { - String fname = file.getFileName().toLowerCase(); - boolean any = false; - for (String include : includeExt) { - any |= fname.endsWith("." + include); - } - if (!any) { - return false; - } + if (hasInclusionsOrExclusions(file, name)) { + return false; } if (endpoint.getFileName() != null) { @@ -768,6 +751,53 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum return true; } + private boolean hasInclusionsOrExclusions(GenericFile<T> file, String name) { + // exclude take precedence over include + if (excludePattern != null) { + if (excludePattern.matcher(name).matches()) { + return true; + } + } + if (excludeExt != null) { + String fname = file.getFileName().toLowerCase(); + if (hasExtExlusions(fname)) { + return true; + } + } + if (includePattern != null) { + if (!includePattern.matcher(name).matches()) { + return true; + } + } + if (includeExt != null) { + String fname = file.getFileName().toLowerCase(); + if (hasExtInclusions(fname)) { + return true; + } + } + return false; + } + + private boolean hasExtInclusions(String fname) { + boolean any = false; + for (String include : includeExt) { + any |= fname.endsWith("." + include); + } + if (!any) { + return true; + } + return false; + } + + private boolean hasExtExlusions(String fname) { + for (String exclude : excludeExt) { + if (fname.endsWith("." + exclude)) { + return true; + } + } + return false; + } + /** * Strategy to perform file matching based on endpoint configuration in terms of done file name. * diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java index 0031201e717..5c549b1f5ed 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileProducer.java @@ -138,27 +138,8 @@ public class GenericFileProducer<T> extends DefaultProducer { targetExists = operations.existsFile(target); if (targetExists) { - LOG.trace("EagerDeleteTargetFile, target exists"); - - if (endpoint.getFileExist() == GenericFileExist.Ignore) { - // ignore but indicate that the file was written - LOG.trace("An existing file already exists: {}. Ignore and do not override it.", target); + if (handleExistingTargetEager(target)) { return; - } else if (endpoint.getFileExist() == GenericFileExist.Fail) { - throw new GenericFileOperationFailedException( - "File already exist: " + target + ". Cannot write new file."); - } else if (endpoint.getFileExist() == GenericFileExist.Move) { - // move any existing file first - this.endpoint.getMoveExistingFileStrategy().moveExistingFile(endpoint, operations, target); - } else if (endpoint.isEagerDeleteTargetFile() && endpoint.getFileExist() == GenericFileExist.Override) { - // we override the target so we do this by deleting - // it so the temp file can be renamed later - // with success as the existing target file have - // been deleted - LOG.trace("Eagerly deleting existing file: {}", target); - if (!operations.deleteFile(target)) { - throw new GenericFileOperationFailedException("Cannot delete file: " + target); - } } } } @@ -166,9 +147,7 @@ public class GenericFileProducer<T> extends DefaultProducer { // delete any pre existing temp file if (endpoint.getFileExist() != GenericFileExist.TryRename && operations.existsFile(tempTarget)) { LOG.trace("Deleting existing temp file: {}", tempTarget); - if (!operations.deleteFile(tempTarget)) { - throw new GenericFileOperationFailedException("Cannot delete file: " + tempTarget); - } + tryOverridingFile(tempTarget); } } @@ -185,24 +164,8 @@ public class GenericFileProducer<T> extends DefaultProducer { targetExists = operations.existsFile(target); if (targetExists) { - LOG.trace("Not using EagerDeleteTargetFile, target exists"); - - if (endpoint.getFileExist() == GenericFileExist.Ignore) { - // ignore but indicate that the file was written - LOG.trace("An existing file already exists: {}. Ignore and do not override it.", target); + if (handleExistingTarget(target)) { return; - } else if (endpoint.getFileExist() == GenericFileExist.Fail) { - throw new GenericFileOperationFailedException( - "File already exist: " + target + ". Cannot write new file."); - } else if (endpoint.getFileExist() == GenericFileExist.Override) { - // we override the target so we do this by deleting - // it so the temp file can be renamed later - // with success as the existing target file have - // been deleted - LOG.trace("Deleting existing file: {}", target); - if (!operations.deleteFile(target)) { - throw new GenericFileOperationFailedException("Cannot delete file: " + target); - } } } } @@ -222,22 +185,7 @@ public class GenericFileProducer<T> extends DefaultProducer { // any done file to write? if (endpoint.getDoneFileName() != null) { - String doneFileName = endpoint.createDoneFileName(target); - StringHelper.notEmpty(doneFileName, "doneFileName", endpoint); - - // create empty exchange with empty body to write as the done - // file - Exchange empty = new DefaultExchange(exchange); - empty.getIn().setBody(""); - - LOG.trace("Writing done file: [{}]", doneFileName); - // delete any existing done file - if (operations.existsFile(doneFileName)) { - if (!operations.deleteFile(doneFileName)) { - throw new GenericFileOperationFailedException("Cannot delete existing done file: " + doneFileName); - } - } - writeFile(empty, doneFileName); + writeDoneFile(exchange, target); } // let's store the name we really used in the header, so end-users @@ -250,6 +198,82 @@ public class GenericFileProducer<T> extends DefaultProducer { postWriteCheck(exchange); } + private static void throwFileAlreadyExistException(String target) { + throw new GenericFileOperationFailedException( + "File already exist: " + target + ". Cannot write new file."); + } + + private static boolean doIgnore(String target) { + // ignore but indicate that the file was written + LOG.trace("An existing file already exists: {}. Ignore and do not override it.", target); + return true; + } + + private void tryOverridingFile(String target) { + if (!operations.deleteFile(target)) { + throw new GenericFileOperationFailedException("Cannot delete file: " + target); + } + } + + private boolean handleExistingTargetEager(String target) { + LOG.trace("EagerDeleteTargetFile, target exists"); + + if (endpoint.getFileExist() == GenericFileExist.Ignore) { + return doIgnore(target); + } else if (endpoint.getFileExist() == GenericFileExist.Fail) { + throwFileAlreadyExistException(target); + } else if (endpoint.getFileExist() == GenericFileExist.Move) { + // move any existing file first + this.endpoint.getMoveExistingFileStrategy().moveExistingFile(endpoint, operations, target); + } else if (endpoint.isEagerDeleteTargetFile() && endpoint.getFileExist() == GenericFileExist.Override) { + // we override the target so we do this by deleting + // it so the temp file can be renamed later + // with success as the existing target file have + // been deleted + LOG.trace("Eagerly deleting existing file: {}", target); + tryOverridingFile(target); + } + return false; + } + + private boolean handleExistingTarget(String target) { + LOG.trace("Not using EagerDeleteTargetFile, target exists"); + + if (endpoint.getFileExist() == GenericFileExist.Ignore) { + // ignore but indicate that the file was written + return doIgnore(target); + } else if (endpoint.getFileExist() == GenericFileExist.Fail) { + throwFileAlreadyExistException(target); + } else if (endpoint.getFileExist() == GenericFileExist.Override) { + // we override the target so we do this by deleting + // it so the temp file can be renamed later + // with success as the existing target file have + // been deleted + LOG.trace("Deleting existing file: {}", target); + tryOverridingFile(target); + } + return false; + } + + private void writeDoneFile(Exchange exchange, String target) { + String doneFileName = endpoint.createDoneFileName(target); + StringHelper.notEmpty(doneFileName, "doneFileName", endpoint); + + // create empty exchange with empty body to write as the done + // file + Exchange empty = new DefaultExchange(exchange); + empty.getIn().setBody(""); + + LOG.trace("Writing done file: [{}]", doneFileName); + // delete any existing done file + if (operations.existsFile(doneFileName)) { + if (!operations.deleteFile(doneFileName)) { + throw new GenericFileOperationFailedException("Cannot delete existing done file: " + doneFileName); + } + } + writeFile(empty, doneFileName); + } + protected void writeChecksumFile(Exchange exchange, String target) throws Exception { String algorithm = endpoint.getChecksumFileAlgorithm(); String checksumFileName = target + "." + algorithm; @@ -326,19 +350,10 @@ public class GenericFileProducer<T> extends DefaultProducer { public String createFileName(Exchange exchange) { String answer; - // overrule takes precedence - Object value; - Object overrule = exchange.getIn().getHeader(FileConstants.OVERRULE_FILE_NAME); - if (overrule != null) { - if (overrule instanceof Expression) { - value = overrule; - } else { - value = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, overrule); - } - } else { - value = exchange.getIn().getHeader(FileConstants.FILE_NAME); - } + + // overrule takes precedence + final Object value = getOverrule(exchange, overrule); // if we have an overrule then override the existing header to use the // overrule computed name from this point forward @@ -359,27 +374,12 @@ public class GenericFileProducer<T> extends DefaultProducer { } // evaluate the name as a String from the value - String name; - if (expression != null) { - LOG.trace("Filename evaluated as expression: {}", expression); - name = expression.evaluate(exchange, String.class); - } else { - name = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value); - } - - // flatten name - if (name != null && endpoint.isFlatten()) { - // check for both windows and unix separators - int pos = Math.max(name.lastIndexOf('/'), name.lastIndexOf("\\")); - if (pos != -1) { - name = name.substring(pos + 1); - } - } + final String name = evaluateName(exchange, expression, value); // compute path by adding endpoint starting directory String endpointPath = endpoint.getConfiguration().getDirectory(); String baseDir = ""; - if (endpointPath.length() > 0) { + if (!endpointPath.isEmpty()) { // Its a directory so we should use it as a base path for the // filename // If the path isn't empty, we need to add a trailing / if it isn't @@ -398,16 +398,7 @@ public class GenericFileProducer<T> extends DefaultProducer { } if (endpoint.isJailStartingDirectory()) { - // check for file must be within starting directory (need to compact - // first as the name can be using relative paths via ../ etc) - String compatchAnswer = FileUtil.compactPath(answer); - String compatchBaseDir = FileUtil.compactPath(baseDir); - if (!compatchAnswer.startsWith(compatchBaseDir)) { - throw new IllegalArgumentException( - "Cannot write file with name: " + compatchAnswer - + " as the filename is jailed to the starting directory: " - + compatchBaseDir); - } + jailedCheck(answer, baseDir); } if (endpoint.getConfiguration().needToNormalize()) { @@ -418,6 +409,59 @@ public class GenericFileProducer<T> extends DefaultProducer { return answer; } + private static Object getOverrule(Exchange exchange, Object overrule) { + Object value; + + if (overrule != null) { + if (overrule instanceof Expression) { + value = overrule; + } else { + value = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, overrule); + } + } else { + value = exchange.getIn().getHeader(FileConstants.FILE_NAME); + } + return value; + } + + private static void jailedCheck(String answer, String baseDir) { + // check for file must be within starting directory (need to compact + // first as the name can be using relative paths via ../ etc) + String compatchAnswer = FileUtil.compactPath(answer); + String compatchBaseDir = FileUtil.compactPath(baseDir); + if (!compatchAnswer.startsWith(compatchBaseDir)) { + throw new IllegalArgumentException( + "Cannot write file with name: " + compatchAnswer + + " as the filename is jailed to the starting directory: " + + compatchBaseDir); + } + } + + private String evaluateName(Exchange exchange, Expression expression, Object value) { + String name; + if (expression != null) { + LOG.trace("Filename evaluated as expression: {}", expression); + name = expression.evaluate(exchange, String.class); + } else { + name = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value); + } + + // flatten name + if (name != null && endpoint.isFlatten()) { + name = flattenName(name); + } + return name; + } + + private static String flattenName(String name) { + // check for both windows and unix separators + int pos = Math.max(name.lastIndexOf('/'), name.lastIndexOf("\\")); + if (pos != -1) { + name = name.substring(pos + 1); + } + return name; + } + public String createTempFileName(Exchange exchange, String fileName) { String answer = fileName; diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileSendDynamicAware.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileSendDynamicAware.java index a3b650cf002..7f086fb1bd4 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileSendDynamicAware.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileSendDynamicAware.java @@ -65,46 +65,25 @@ public abstract class GenericFileSendDynamicAware extends SendDynamicAwareSuppor Map<String, Object> originalParams = URISupport.parseQuery(URISupport.extractQuery(entry.getOriginalUri())); if (fileName) { - Object val = originalParams.get(PROP_FILE_NAME); - if (val != null) { - params.put(PROP_FILE_NAME, val.toString()); - } + compute(originalParams, PROP_FILE_NAME, params); } if (tempFileName) { - Object val = originalParams.get(PROP_TEMP_FILE_NAME); - if (val != null) { - params.put(PROP_TEMP_FILE_NAME, val.toString()); - } + compute(originalParams, PROP_TEMP_FILE_NAME, params); } if (idempotentKey) { - Object val = originalParams.get(PROP_IDEMPOTENT_KEY); - if (val != null) { - params.put(PROP_IDEMPOTENT_KEY, val.toString()); - } + compute(originalParams, PROP_IDEMPOTENT_KEY, params); } if (move) { - Object val = originalParams.get(PROP_MOVE); - if (val != null) { - params.put(PROP_MOVE, val.toString()); - } + compute(originalParams, PROP_MOVE, params); } if (moveFailed) { - Object val = originalParams.get(PROP_MOVE_FAILED); - if (val != null) { - params.put(PROP_MOVE_FAILED, val.toString()); - } + compute(originalParams, PROP_MOVE_FAILED, params); } if (preMove) { - Object val = originalParams.get(PROP_PRE_MOVE); - if (val != null) { - params.put(PROP_PRE_MOVE, val.toString()); - } + compute(originalParams, PROP_PRE_MOVE, params); } if (moveExisting) { - Object val = originalParams.get(PROP_MOVE_EXISTING); - if (val != null) { - params.put(PROP_MOVE_EXISTING, val.toString()); - } + compute(originalParams, PROP_MOVE_EXISTING, params); } return asEndpointUri(exchange, uri, params); @@ -113,6 +92,13 @@ public abstract class GenericFileSendDynamicAware extends SendDynamicAwareSuppor } } + private static void compute(Map<String, Object> originalParams, String propFileName, Map<String, Object> params) { + Object val = originalParams.get(propFileName); + if (val != null) { + params.put(propFileName, val.toString()); + } + } + @Override public Processor createPreProcessor(Exchange exchange, DynamicAwareEntry entry) throws Exception { return null; diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java index 17f4dfef034..3b966944509 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileChangedExclusiveReadLockStrategy.java @@ -63,12 +63,7 @@ public class FileChangedExclusiveReadLockStrategy extends MarkerFileExclusiveRea while (!exclusive) { // timeout check if (timeout > 0) { - long delta = watch.taken(); - if (delta > timeout) { - CamelLogger.log(LOG, readLockLoggingLevel, - "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file); - // we could not get the lock within the timeout period, so - // return false + if (isTimedOut(watch, target, timeout, readLockLoggingLevel)) { return false; } } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java index 04a77a6e5f3..76df01ba234 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java @@ -80,12 +80,7 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo while (!exclusive) { // timeout check if (timeout > 0) { - long delta = watch.taken(); - if (delta > timeout) { - CamelLogger.log(LOG, readLockLoggingLevel, - "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + target); - // we could not get the lock within the timeout period, - // so return false + if (isTimedOut(watch, target, timeout, readLockLoggingLevel)) { return false; } } @@ -121,15 +116,7 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo // somehow hold a lock to a file // such as AntiVirus or MS Office that has special locks for it's // supported files - if (timeout == 0) { - // if not using timeout, then we cant retry, so return false - return false; - } - LOG.debug("Cannot acquire read lock. Will try again.", e); - boolean interrupted = sleep(); - if (interrupted) { - // we were interrupted while sleeping, we are likely being - // shutdown so return false + if (handleIOException(e)) { return false; } } finally { @@ -152,6 +139,21 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo return true; } + private boolean handleIOException(IOException e) { + if (timeout == 0) { + // if not using timeout, then we cant retry, so return false + return true; + } + LOG.debug("Cannot acquire read lock. Will try again.", e); + boolean interrupted = sleep(); + if (interrupted) { + // we were interrupted while sleeping, we are likely being + // shutdown so return false + return true; + } + return false; + } + @Override protected void doReleaseExclusiveReadLock(GenericFile<File> file, Exchange exchange) throws Exception { diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java index df916396d87..a1920ea5802 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java @@ -45,55 +45,72 @@ public final class FileProcessStrategyFactory implements GenericFileProcessStrat boolean isMove = moveExpression != null || preMoveExpression != null || moveFailedExpression != null; if (isDelete) { - GenericFileDeleteProcessStrategy<File> strategy = new GenericFileDeleteProcessStrategy<>(); - strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params)); - if (preMoveExpression != null) { - GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); - renamer.setExpression(preMoveExpression); - strategy.setBeginRenamer(renamer); - } - if (moveFailedExpression != null) { - GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); - renamer.setExpression(moveFailedExpression); - strategy.setFailureRenamer(renamer); - } - return strategy; + return newGenericFileDeleteProcessStrategy(params, preMoveExpression, moveFailedExpression); } else if (isMove || isNoop) { - GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<>(); - strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params)); - if (!isNoop) { - // move on commit is only possible if not noop - if (moveExpression != null) { - GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); - renamer.setExpression(moveExpression); - strategy.setCommitRenamer(renamer); - } else { - strategy.setCommitRenamer(getDefaultCommitRenamer(context)); - } - } - // both move and noop supports pre move - if (preMoveExpression != null) { - GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); - renamer.setExpression(preMoveExpression); - strategy.setBeginRenamer(renamer); - } - // both move and noop supports move failed - if (moveFailedExpression != null) { - GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); - renamer.setExpression(moveFailedExpression); - strategy.setFailureRenamer(renamer); - } - return strategy; + return newGenericFileRenameProcessStrategy(context, params, isNoop, moveExpression, preMoveExpression, + moveFailedExpression); } else { // default strategy will move files in a .camel/ subfolder where the // file was consumed - GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<>(); - strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params)); - strategy.setCommitRenamer(getDefaultCommitRenamer(context)); - return strategy; + return newGenericFileRenameProcessStrategy(context, params); } } + private static GenericFileRenameProcessStrategy<File> newGenericFileRenameProcessStrategy( + CamelContext context, Map<String, Object> params) { + GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<>(); + strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params)); + strategy.setCommitRenamer(getDefaultCommitRenamer(context)); + return strategy; + } + + private static GenericFileRenameProcessStrategy<File> newGenericFileRenameProcessStrategy( + CamelContext context, Map<String, Object> params, boolean isNoop, Expression moveExpression, + Expression preMoveExpression, Expression moveFailedExpression) { + GenericFileRenameProcessStrategy<File> strategy = new GenericFileRenameProcessStrategy<>(); + strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params)); + if (!isNoop) { + // move on commit is only possible if not noop + if (moveExpression != null) { + GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); + renamer.setExpression(moveExpression); + strategy.setCommitRenamer(renamer); + } else { + strategy.setCommitRenamer(getDefaultCommitRenamer(context)); + } + } + // both move and noop supports pre move + if (preMoveExpression != null) { + GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); + renamer.setExpression(preMoveExpression); + strategy.setBeginRenamer(renamer); + } + // both move and noop supports move failed + if (moveFailedExpression != null) { + GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); + renamer.setExpression(moveFailedExpression); + strategy.setFailureRenamer(renamer); + } + return strategy; + } + + private static GenericFileDeleteProcessStrategy<File> newGenericFileDeleteProcessStrategy( + Map<String, Object> params, Expression preMoveExpression, Expression moveFailedExpression) { + GenericFileDeleteProcessStrategy<File> strategy = new GenericFileDeleteProcessStrategy<>(); + strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params)); + if (preMoveExpression != null) { + GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); + renamer.setExpression(preMoveExpression); + strategy.setBeginRenamer(renamer); + } + if (moveFailedExpression != null) { + GenericFileExpressionRenamer<File> renamer = new GenericFileExpressionRenamer<>(); + renamer.setExpression(moveFailedExpression); + strategy.setFailureRenamer(renamer); + } + return strategy; + } + private static GenericFileExpressionRenamer<File> getDefaultCommitRenamer(CamelContext context) { // use context to lookup language to let it be loose coupled Language language = context.resolveLanguage("file"); @@ -111,6 +128,12 @@ public final class FileProcessStrategyFactory implements GenericFileProcessStrat } // no explicit strategy set then fallback to readLock option + return fallbackToReadLock(params); + } + + private static GenericFileExclusiveReadLockStrategy<File> fallbackToReadLock( + Map<String, Object> params) { + GenericFileExclusiveReadLockStrategy<File> strategy = null; String readLock = (String) params.get("readLock"); if (ObjectHelper.isNotEmpty(readLock)) { if ("none".equals(readLock) || "false".equals(readLock)) { @@ -122,131 +145,163 @@ public final class FileProcessStrategyFactory implements GenericFileProcessStrat } else if ("rename".equals(readLock)) { strategy = new FileRenameExclusiveReadLockStrategy(); } else if ("changed".equals(readLock)) { - FileChangedExclusiveReadLockStrategy readLockStrategy = new FileChangedExclusiveReadLockStrategy(); - Long minLength = (Long) params.get("readLockMinLength"); - if (minLength != null) { - readLockStrategy.setMinLength(minLength); - } - Long minAge = (Long) params.get("readLockMinAge"); - if (null != minAge) { - readLockStrategy.setMinAge(minAge); - } - strategy = readLockStrategy; + strategy = newStrategyForChanged(params); } else if ("idempotent".equals(readLock)) { - FileIdempotentRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRepositoryReadLockStrategy(); - Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback"); - if (readLockRemoveOnRollback != null) { - readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback); - } - Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit"); - if (readLockRemoveOnCommit != null) { - readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit); - } - IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository"); - if (repo != null) { - readLockStrategy.setIdempotentRepository(repo); - } - Integer readLockIdempotentReleaseDelay = (Integer) params.get("readLockIdempotentReleaseDelay"); - if (readLockIdempotentReleaseDelay != null) { - readLockStrategy.setReadLockIdempotentReleaseDelay(readLockIdempotentReleaseDelay); - } - Boolean readLockIdempotentReleaseAsync = (Boolean) params.get("readLockIdempotentReleaseAsync"); - if (readLockIdempotentReleaseAsync != null) { - readLockStrategy.setReadLockIdempotentReleaseAsync(readLockIdempotentReleaseAsync); - } - Integer readLockIdempotentReleaseAsyncPoolSize = (Integer) params.get("readLockIdempotentReleaseAsyncPoolSize"); - if (readLockIdempotentReleaseAsyncPoolSize != null) { - readLockStrategy.setReadLockIdempotentReleaseAsyncPoolSize(readLockIdempotentReleaseAsyncPoolSize); - } - ScheduledExecutorService readLockIdempotentReleaseExecutorService - = (ScheduledExecutorService) params.get("readLockIdempotentReleaseExecutorService"); - if (readLockIdempotentReleaseExecutorService != null) { - readLockStrategy.setReadLockIdempotentReleaseExecutorService(readLockIdempotentReleaseExecutorService); - } - strategy = readLockStrategy; + strategy = newStrategyForIdempotent(params); } else if ("idempotent-changed".equals(readLock)) { - FileIdempotentChangedRepositoryReadLockStrategy readLockStrategy - = new FileIdempotentChangedRepositoryReadLockStrategy(); - Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback"); - if (readLockRemoveOnRollback != null) { - readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback); - } - Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit"); - if (readLockRemoveOnCommit != null) { - readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit); - } - IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository"); - if (repo != null) { - readLockStrategy.setIdempotentRepository(repo); - } - Long minLength = (Long) params.get("readLockMinLength"); - if (minLength != null) { - readLockStrategy.setMinLength(minLength); - } - Long minAge = (Long) params.get("readLockMinAge"); - if (null != minAge) { - readLockStrategy.setMinAge(minAge); - } - Integer readLockIdempotentReleaseDelay = (Integer) params.get("readLockIdempotentReleaseDelay"); - if (readLockIdempotentReleaseDelay != null) { - readLockStrategy.setReadLockIdempotentReleaseDelay(readLockIdempotentReleaseDelay); - } - Boolean readLockIdempotentReleaseAsync = (Boolean) params.get("readLockIdempotentReleaseAsync"); - if (readLockIdempotentReleaseAsync != null) { - readLockStrategy.setReadLockIdempotentReleaseAsync(readLockIdempotentReleaseAsync); - } - Integer readLockIdempotentReleaseAsyncPoolSize = (Integer) params.get("readLockIdempotentReleaseAsyncPoolSize"); - if (readLockIdempotentReleaseAsyncPoolSize != null) { - readLockStrategy.setReadLockIdempotentReleaseAsyncPoolSize(readLockIdempotentReleaseAsyncPoolSize); - } - ScheduledExecutorService readLockIdempotentReleaseExecutorService - = (ScheduledExecutorService) params.get("readLockIdempotentReleaseExecutorService"); - if (readLockIdempotentReleaseExecutorService != null) { - readLockStrategy.setReadLockIdempotentReleaseExecutorService(readLockIdempotentReleaseExecutorService); - } - strategy = readLockStrategy; + strategy = newStrategyForIdempotentChanged(params); } else if ("idempotent-rename".equals(readLock)) { - FileIdempotentRenameRepositoryReadLockStrategy readLockStrategy - = new FileIdempotentRenameRepositoryReadLockStrategy(); - Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback"); - if (readLockRemoveOnRollback != null) { - readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback); - } - Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit"); - if (readLockRemoveOnCommit != null) { - readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit); - } - IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository"); - if (repo != null) { - readLockStrategy.setIdempotentRepository(repo); - } - strategy = readLockStrategy; + strategy = newStrategyForIdempotentRename(params); } if (strategy != null) { - Long timeout = (Long) params.get("readLockTimeout"); - if (timeout != null) { - strategy.setTimeout(timeout); - } - Long checkInterval = (Long) params.get("readLockCheckInterval"); - if (checkInterval != null) { - strategy.setCheckInterval(checkInterval); - } - LoggingLevel readLockLoggingLevel = (LoggingLevel) params.get("readLockLoggingLevel"); - if (readLockLoggingLevel != null) { - strategy.setReadLockLoggingLevel(readLockLoggingLevel); - } - Boolean readLockMarkerFile = (Boolean) params.get("readLockMarkerFile"); - if (readLockMarkerFile != null) { - strategy.setMarkerFiler(readLockMarkerFile); - } - Boolean readLockDeleteOrphanLockFiles = (Boolean) params.get("readLockDeleteOrphanLockFiles"); - if (readLockDeleteOrphanLockFiles != null) { - strategy.setDeleteOrphanLockFiles(readLockDeleteOrphanLockFiles); - } + setupStrategy(params, strategy); } } return strategy; } + + private static void setupStrategy(Map<String, Object> params, GenericFileExclusiveReadLockStrategy<File> strategy) { + Long timeout = (Long) params.get("readLockTimeout"); + if (timeout != null) { + strategy.setTimeout(timeout); + } + Long checkInterval = (Long) params.get("readLockCheckInterval"); + if (checkInterval != null) { + strategy.setCheckInterval(checkInterval); + } + LoggingLevel readLockLoggingLevel = (LoggingLevel) params.get("readLockLoggingLevel"); + if (readLockLoggingLevel != null) { + strategy.setReadLockLoggingLevel(readLockLoggingLevel); + } + Boolean readLockMarkerFile = (Boolean) params.get("readLockMarkerFile"); + if (readLockMarkerFile != null) { + strategy.setMarkerFiler(readLockMarkerFile); + } + Boolean readLockDeleteOrphanLockFiles = (Boolean) params.get("readLockDeleteOrphanLockFiles"); + if (readLockDeleteOrphanLockFiles != null) { + strategy.setDeleteOrphanLockFiles(readLockDeleteOrphanLockFiles); + } + } + + private static GenericFileExclusiveReadLockStrategy<File> newStrategyForChanged( + Map<String, Object> params) { + GenericFileExclusiveReadLockStrategy<File> strategy; + FileChangedExclusiveReadLockStrategy readLockStrategy = new FileChangedExclusiveReadLockStrategy(); + Long minLength = (Long) params.get("readLockMinLength"); + if (minLength != null) { + readLockStrategy.setMinLength(minLength); + } + Long minAge = (Long) params.get("readLockMinAge"); + if (null != minAge) { + readLockStrategy.setMinAge(minAge); + } + strategy = readLockStrategy; + return strategy; + } + + private static GenericFileExclusiveReadLockStrategy<File> newStrategyForIdempotentRename( + Map<String, Object> params) { + GenericFileExclusiveReadLockStrategy<File> strategy; + FileIdempotentRenameRepositoryReadLockStrategy readLockStrategy + = new FileIdempotentRenameRepositoryReadLockStrategy(); + Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback"); + if (readLockRemoveOnRollback != null) { + readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback); + } + Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit"); + if (readLockRemoveOnCommit != null) { + readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit); + } + IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository"); + if (repo != null) { + readLockStrategy.setIdempotentRepository(repo); + } + strategy = readLockStrategy; + return strategy; + } + + private static GenericFileExclusiveReadLockStrategy<File> newStrategyForIdempotentChanged( + Map<String, Object> params) { + GenericFileExclusiveReadLockStrategy<File> strategy; + FileIdempotentChangedRepositoryReadLockStrategy readLockStrategy + = new FileIdempotentChangedRepositoryReadLockStrategy(); + Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback"); + if (readLockRemoveOnRollback != null) { + readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback); + } + Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit"); + if (readLockRemoveOnCommit != null) { + readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit); + } + IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository"); + if (repo != null) { + readLockStrategy.setIdempotentRepository(repo); + } + Long minLength = (Long) params.get("readLockMinLength"); + if (minLength != null) { + readLockStrategy.setMinLength(minLength); + } + Long minAge = (Long) params.get("readLockMinAge"); + if (null != minAge) { + readLockStrategy.setMinAge(minAge); + } + Integer readLockIdempotentReleaseDelay = (Integer) params.get("readLockIdempotentReleaseDelay"); + if (readLockIdempotentReleaseDelay != null) { + readLockStrategy.setReadLockIdempotentReleaseDelay(readLockIdempotentReleaseDelay); + } + Boolean readLockIdempotentReleaseAsync = (Boolean) params.get("readLockIdempotentReleaseAsync"); + if (readLockIdempotentReleaseAsync != null) { + readLockStrategy.setReadLockIdempotentReleaseAsync(readLockIdempotentReleaseAsync); + } + Integer readLockIdempotentReleaseAsyncPoolSize = (Integer) params.get("readLockIdempotentReleaseAsyncPoolSize"); + if (readLockIdempotentReleaseAsyncPoolSize != null) { + readLockStrategy.setReadLockIdempotentReleaseAsyncPoolSize(readLockIdempotentReleaseAsyncPoolSize); + } + ScheduledExecutorService readLockIdempotentReleaseExecutorService + = (ScheduledExecutorService) params.get("readLockIdempotentReleaseExecutorService"); + if (readLockIdempotentReleaseExecutorService != null) { + readLockStrategy.setReadLockIdempotentReleaseExecutorService(readLockIdempotentReleaseExecutorService); + } + strategy = readLockStrategy; + return strategy; + } + + private static GenericFileExclusiveReadLockStrategy<File> newStrategyForIdempotent( + Map<String, Object> params) { + GenericFileExclusiveReadLockStrategy<File> strategy; + FileIdempotentRepositoryReadLockStrategy readLockStrategy = new FileIdempotentRepositoryReadLockStrategy(); + Boolean readLockRemoveOnRollback = (Boolean) params.get("readLockRemoveOnRollback"); + if (readLockRemoveOnRollback != null) { + readLockStrategy.setRemoveOnRollback(readLockRemoveOnRollback); + } + Boolean readLockRemoveOnCommit = (Boolean) params.get("readLockRemoveOnCommit"); + if (readLockRemoveOnCommit != null) { + readLockStrategy.setRemoveOnCommit(readLockRemoveOnCommit); + } + IdempotentRepository repo = (IdempotentRepository) params.get("readLockIdempotentRepository"); + if (repo != null) { + readLockStrategy.setIdempotentRepository(repo); + } + Integer readLockIdempotentReleaseDelay = (Integer) params.get("readLockIdempotentReleaseDelay"); + if (readLockIdempotentReleaseDelay != null) { + readLockStrategy.setReadLockIdempotentReleaseDelay(readLockIdempotentReleaseDelay); + } + Boolean readLockIdempotentReleaseAsync = (Boolean) params.get("readLockIdempotentReleaseAsync"); + if (readLockIdempotentReleaseAsync != null) { + readLockStrategy.setReadLockIdempotentReleaseAsync(readLockIdempotentReleaseAsync); + } + Integer readLockIdempotentReleaseAsyncPoolSize = (Integer) params.get("readLockIdempotentReleaseAsyncPoolSize"); + if (readLockIdempotentReleaseAsyncPoolSize != null) { + readLockStrategy.setReadLockIdempotentReleaseAsyncPoolSize(readLockIdempotentReleaseAsyncPoolSize); + } + ScheduledExecutorService readLockIdempotentReleaseExecutorService + = (ScheduledExecutorService) params.get("readLockIdempotentReleaseExecutorService"); + if (readLockIdempotentReleaseExecutorService != null) { + readLockStrategy.setReadLockIdempotentReleaseExecutorService(readLockIdempotentReleaseExecutorService); + } + strategy = readLockStrategy; + return strategy; + } } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java index 82650398f48..da924dc71ce 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategy.java @@ -27,6 +27,7 @@ import org.apache.camel.component.file.GenericFileEndpoint; import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy; import org.apache.camel.component.file.GenericFileFilter; import org.apache.camel.component.file.GenericFileOperations; +import org.apache.camel.spi.CamelLogger; import org.apache.camel.util.FileUtil; import org.apache.camel.util.StopWatch; import org.apache.camel.util.StringHelper; @@ -320,4 +321,16 @@ public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusive return path + "-" + key; } + protected static boolean isTimedOut(StopWatch watch, File target, long timeout, LoggingLevel readLockLoggingLevel) { + long delta = watch.taken(); + if (delta > timeout) { + CamelLogger.log(LOG, readLockLoggingLevel, + "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + target); + // we could not get the lock within the timeout period, + // so return false + return true; + } + return false; + } + }