Author: davsclaus
Date: Sun Sep 21 04:02:51 2008
New Revision: 697472
URL: http://svn.apache.org/viewvc?rev=697472&view=rev
Log:
CAMEL-925: file consumer throws IOException if rename or delete consumed file
failed
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java
- copied, changed from r697361,
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/DeleteFileProcessStrategy.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/RenameFileProcessStrategy.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/DeleteFileProcessStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/DeleteFileProcessStrategy.java?rev=697472&r1=697471&r2=697472&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/DeleteFileProcessStrategy.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/DeleteFileProcessStrategy.java
Sun Sep 21 04:02:51 2008
@@ -17,6 +17,7 @@
package org.apache.camel.component.file.strategy;
import java.io.File;
+import java.io.IOException;
import org.apache.camel.component.file.FileEndpoint;
import org.apache.camel.component.file.FileExchange;
@@ -45,7 +46,7 @@
}
boolean deleted = file.delete();
if (!deleted) {
- LOG.warn("Could not delete file: " + file);
+ throw new IOException("Can not delete file: " + file);
}
// must commit to release the lock
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java?rev=697472&r1=697471&r2=697472&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
Sun Sep 21 04:02:51 2008
@@ -26,6 +26,7 @@
import org.apache.camel.component.file.FileExchange;
import org.apache.camel.component.file.FileProcessStrategy;
import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -63,12 +64,12 @@
FileChannel channel = new RandomAccessFile(lockFileName,
"rw").getChannel();
FileLock lock = channel.lock();
if (lock != null) {
- exchange.setProperty("org.apache.camel.fileChannel", channel);
exchange.setProperty("org.apache.camel.file.lock", lock);
exchange.setProperty("org.apache.camel.file.lock.name",
lockFileName);
return true;
+ } else {
+ return false;
}
- return false;
}
return true;
}
@@ -81,7 +82,7 @@
try {
unlockFile(endpoint, exchange, file);
} catch (Exception e) {
- LOG.info("Unable to unlock file: " + file + ": " + e.getMessage(),
e);
+ LOG.warn("Unable to unlock file: " + file, e);
}
}
@@ -103,14 +104,24 @@
protected void unlockFile(FileEndpoint endpoint, FileExchange exchange,
File file) throws Exception {
if (isLockFile()) {
- Channel channel = ExchangeHelper.getMandatoryProperty(exchange,
"org.apache.camel.fileChannel", Channel.class);
- String lockfile = ExchangeHelper.getMandatoryProperty(exchange,
"org.apache.camel.file.lock.name", String.class);
+ FileLock lock = ExchangeHelper.getMandatoryProperty(exchange,
"org.apache.camel.file.lock", FileLock.class);
+ String lockFileName =
ExchangeHelper.getMandatoryProperty(exchange,
"org.apache.camel.file.lock.name", String.class);
+ Channel channel = lock.channel();
if (LOG.isDebugEnabled()) {
LOG.debug("Unlocking file: " + file);
}
- channel.close();
- File lock = new File(lockfile);
- lock.delete();
+ try {
+ lock.release();
+ } finally {
+ // must close channel
+ ObjectHelper.close(channel, "Closing channel", LOG);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Deleting lock file: " + lockFileName);
+ }
+ File lockfile = new File(lockFileName);
+ lockfile.delete();
+ }
}
}
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/RenameFileProcessStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/RenameFileProcessStrategy.java?rev=697472&r1=697471&r2=697472&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/RenameFileProcessStrategy.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/RenameFileProcessStrategy.java
Sun Sep 21 04:02:51 2008
@@ -17,6 +17,7 @@
package org.apache.camel.component.file.strategy;
import java.io.File;
+import java.io.IOException;
import org.apache.camel.component.file.FileEndpoint;
import org.apache.camel.component.file.FileExchange;
@@ -52,10 +53,6 @@
@Override
public void commit(FileEndpoint endpoint, FileExchange exchange, File
file) throws Exception {
File newName = renamer.renameFile(exchange, file);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Renaming file: " + file + " to: " + newName);
- }
-
// deleting any existing files before renaming
if (newName.exists()) {
newName.delete();
@@ -64,9 +61,12 @@
// make parent folder if missing
newName.getParentFile().mkdirs();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming file: " + file + " to: " + newName);
+ }
boolean renamed = file.renameTo(newName);
if (!renamed) {
- LOG.warn("Could not rename file from: " + file + " to " + newName);
+ throw new IOException("Can not rename file from: " + file + " to:
" + newName);
}
// must commit to release the lock
Copied:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java
(from r697361,
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java&r1=697361&r2=697472&rev=697472&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java
Sun Sep 21 04:02:51 2008
@@ -23,9 +23,9 @@
import org.apache.camel.component.mock.MockEndpoint;
/**
- * Unit test for the alwaysConsume option.
+ * Unit test for the alwaysConsume=false option.
*/
-public class FileAlwaysConsumeTest extends ContextTestSupport {
+public class FileAlwaysConsumeFalseTest extends ContextTestSupport {
@Override
protected void setUp() throws Exception {
@@ -34,44 +34,16 @@
template.sendBodyAndHeader("file://target/alwaysconsume/", "Hello
World", FileComponent.HEADER_FILE_NAME, "report.txt");
}
- public void testAlwaysConsume() throws Exception {
- context.addRoutes(new RouteBuilder() {
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
public void configure() throws Exception {
-
from("file://target/alwaysconsume/?consumer.alwaysConsume=true&moveNamePrefix=done/").to("mock:result");
+
from("file://target/alwaysconsume/?consumer.alwaysConsume=false&moveNamePrefix=done/").to("mock:result");
}
- });
-
- // consume the file the first time
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("Hello World");
- mock.expectedMessageCount(1);
-
- assertMockEndpointsSatisfied();
-
- Thread.sleep(1000);
-
- // reset mock and set new expectations
- mock.reset();
- mock.expectedBodiesReceived("Hello World");
- mock.expectedMessageCount(1);
-
- // move file back
- File file = new File("target/alwaysconsume/done/report.txt");
- File renamed = new File("target/alwaysconsume/report.txt");
- file = file.getAbsoluteFile();
- file.renameTo(renamed.getAbsoluteFile());
-
- // should consume the file again
- assertMockEndpointsSatisfied();
+ };
}
public void testNotAlwaysConsume() throws Exception {
- context.addRoutes(new RouteBuilder() {
- public void configure() throws Exception {
-
from("file://target/alwaysconsume/?consumer.alwaysConsume=false&moveNamePrefix=done/").to("mock:result");
- }
- });
-
// consume the file the first time
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Hello World");
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java?rev=697472&r1=697471&r2=697472&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java
Sun Sep 21 04:02:51 2008
@@ -23,7 +23,7 @@
import org.apache.camel.component.mock.MockEndpoint;
/**
- * Unit test for the alwaysConsume option.
+ * Unit test for the alwaysConsume=true option.
*/
public class FileAlwaysConsumeTest extends ContextTestSupport {
@@ -34,13 +34,17 @@
template.sendBodyAndHeader("file://target/alwaysconsume/", "Hello
World", FileComponent.HEADER_FILE_NAME, "report.txt");
}
- public void testAlwaysConsume() throws Exception {
- context.addRoutes(new RouteBuilder() {
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
public void configure() throws Exception {
from("file://target/alwaysconsume/?consumer.alwaysConsume=true&moveNamePrefix=done/").to("mock:result");
}
- });
+ };
+
+ }
+ public void testAlwaysConsume() throws Exception {
// consume the file the first time
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Hello World");
@@ -65,35 +69,4 @@
assertMockEndpointsSatisfied();
}
- public void testNotAlwaysConsume() throws Exception {
- context.addRoutes(new RouteBuilder() {
- public void configure() throws Exception {
-
from("file://target/alwaysconsume/?consumer.alwaysConsume=false&moveNamePrefix=done/").to("mock:result");
- }
- });
-
- // consume the file the first time
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("Hello World");
- mock.expectedMessageCount(1);
-
- assertMockEndpointsSatisfied();
-
- Thread.sleep(1000);
-
- // reset mock and set new expectations
- mock.reset();
- mock.expectedMessageCount(0);
-
- // move file back
- File file = new File("target/alwaysconsume/done/report.txt");
- File renamed = new File("target/alwaysconsume/report.txt");
- file = file.getAbsoluteFile();
- file.renameTo(renamed.getAbsoluteFile());
-
- // should NOT consume the file again, let 2 secs pass to let the
consuemr try to consume it but it should not
- Thread.sleep(2000);
- assertMockEndpointsSatisfied();
- }
-
}
\ No newline at end of file