Author: davsclaus
Date: Sun Jul 27 10:46:28 2008
New Revision: 680159
URL: http://svn.apache.org/viewvc?rev=680159&view=rev
Log:
Polished exclusive read lock for FileConsumer and FTPConsumer. Not possible to
unit test in single JVM with unit test and Camel running on same JVM. See
FileLock javadoc.
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=680159&r1=680158&r2=680159&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
Sun Jul 27 10:46:28 2008
@@ -141,7 +141,7 @@
boolean handled =
DeadLetterChannel.isFailureHandled(exchange);
if (LOG.isDebugEnabled()) {
- LOG.debug("Done processing file: " + file + ".
Status is: " + (failed ? "failed: " + failed + ", handled by failure processor:
" + handled : "OK"));
+ LOG.debug("Done processing file: " + file + ".
Status is: " + (failed ? "failed: " + failed + ", handled by failure processor:
" + handled : "processed OK"));
}
if (!failed || handled) {
@@ -170,26 +170,24 @@
protected void acquireExclusiveRead(File file) throws IOException {
if (LOG.isTraceEnabled()) {
- LOG.trace("Acquiring exclusive read (avoid reading file that is in
progress of being written) to " + file);
+ LOG.trace("Waiting for exclusive lock to file: " + file);
}
// try to acquire rw lock on the file before we can consume it
FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
try {
FileLock lock = channel.lock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Acquired exclusive lock: " + lock + " to file: " +
file);
+ }
// just release it now we dont want to hold it during the rest of
the processing
lock.release();
} finally {
// must close channel
- ObjectHelper.close(channel, "FileConsumer during acquiring of
exclusive read", LOG);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Acquired exclusive read to: " + file);
+ ObjectHelper.close(channel, "FileConsumer during acquiring of
exclusive lock", LOG);
}
}
-
/**
* Strategy when the file was processed and a commit should be executed.
*
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java?rev=680159&r1=680158&r2=680159&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
Sun Jul 27 10:46:28 2008
@@ -53,13 +53,13 @@
mock.assertIsSatisfied();
}
- // TODO: Fix on Bamboo
+ // TODO: Not possible to test in the same JVM (see javadoc for FileLock)
public void xxxtestPollFileWhileSlowFileIsBeingWritten() throws Exception {
deleteDirectory("./target/exclusiveread");
createDirectory("./target/exclusiveread/slowfile");
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);
- mock.expectedBodiesReceived("Hello World");
+ mock.expectedBodiesReceived("Hello WorldLine #0Line #1Line #2Bye
Worl");
createSlowFile();
@@ -68,24 +68,17 @@
private void createSlowFile() throws Exception {
LOG.info("Creating a slow file ...");
-
File file = new File("./target/exclusiveread/slowfile/hello.txt");
FileOutputStream fos = new FileOutputStream(file);
-
- // get a lock so we are the only one working on this file
FileLock lock = fos.getChannel().lock();
-
- byte[] buffer = "Hello World".getBytes();
- ByteBuffer bb = ByteBuffer.wrap(buffer);
- for (int i = 0; i < buffer.length; i++) {
+ fos.write("Hello World".getBytes());
+ for (int i = 0; i < 3; i++) {
+ Thread.sleep(1000);
+ fos.write(("Line #" + i).getBytes());
LOG.info("Appending to slowfile");
- Thread.sleep(300);
}
- LOG.info("Writing to file");
- fos.write(buffer);
- LOG.info("Releasing lock");
+ fos.write("Bye World".getBytes());
lock.release();
- LOG.info("Closing file");
fos.close();
LOG.info("... done creating slowfile");
}
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=680159&r1=680158&r2=680159&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
Sun Jul 27 10:46:28 2008
@@ -210,20 +210,25 @@
}
protected void acquireExclusiveRead(FTPClient client, FTPFile ftpFile)
throws IOException {
- LOG.trace("Acquiring exclusive read (avoid reading file that is in
progress of being written)");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for exclusive lock to file: " + ftpFile);
+ }
// the trick is to try to rename the file, if we can rename then we
have exclusive read
- // since its a remote file we can not use java.nio to get a RW access
+ // since its a remote file we can not use java.nio to get a RW lock
String originalName = ftpFile.getName();
String newName = originalName + ".camelExclusiveRead";
boolean exclusive = false;
while (! exclusive) {
exclusive = client.rename(originalName, newName);
if (exclusive) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Acquired exclusive lock to file: " +
originalName);
+ }
// rename it back so we can read it
client.rename(newName, originalName);
} else {
- LOG.trace("Exclusive read not granted. Sleeping for 1000
millis");
+ LOG.trace("Exclusive lock not granted. Sleeping for 1000
millis.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
@@ -231,9 +236,6 @@
}
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Acquired exclusive read to: " + originalName);
- }
}
private String remoteServer() {
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=680159&r1=680158&r2=680159&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
Sun Jul 27 10:46:28 2008
@@ -220,7 +220,9 @@
}
protected void acquireExclusiveRead(ChannelSftp.LsEntry sftpFile) throws
SftpException {
- LOG.trace("Acquiring exclusive read (avoid reading file that is in
progress of being written)");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for exclusive lock to file: " + sftpFile);
+ }
// the trick is to try to rename the file, if we can rename then we
have exclusive read
// since its a remote file we can not use java.nio to get a RW access
@@ -236,10 +238,13 @@
}
if (exclusive) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Acquired exclusive lock to file: " +
originalName);
+ }
// rename it back so we can read it
channel.rename(newName, originalName);
} else {
- LOG.trace("Exclusive read not granted. Sleeping for 1000
millis");
+ LOG.trace("Exclusive lock not granted. Sleeping for 1000
millis");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
@@ -247,9 +252,6 @@
}
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Acquired exclusive read to: " + originalName);
- }
}
private String remoteServer() {
Modified:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java?rev=680159&r1=680158&r2=680159&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
Sun Jul 27 10:46:28 2008
@@ -38,7 +38,7 @@
return port;
}
- // TODO fix on Bamboo
+ // TODO: Not possible to test in single JVM
public void testPollFileWhileSlowFileIsBeingWritten() throws Exception {
/*deleteDirectory("./res/home");
createDirectory("./res/home/slowfile");