Author: davsclaus
Date: Sun Dec 28 02:06:49 2008
New Revision: 729714
URL: http://svn.apache.org/viewvc?rev=729714&view=rev
Log:
CAMEL-1154: Added idempotent to camel-ftp
Added:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java
- copied, changed from r729480,
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=729714&r1=729713&r2=729714&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
Sun Dec 28 02:06:49 2008
@@ -193,23 +193,29 @@
/**
* Strategy when the file was processed and a commit should be executed.
*
- * @param remoteFileProcessStrategy the strategy to perform the commit
- * @param exchange the exchange
- * @param remoteFile the file processed
- * @param failureHandled is <tt>false</tt> if the exchange was
processed succesfully, <tt>true</tt> if
- * an exception occured during processing
but it was handled by the failure processor (usually the
- * DeadLetterChannel).
+ * @param processStrategy the strategy to perform the commit
+ * @param exchange the exchange
+ * @param file the file processed
+ * @param failureHandled is <tt>false</tt> if the exchange was processed
succesfully, <tt>true</tt> if
+ * an exception occured during processing but it
was handled by the failure processor (usually the
+ * DeadLetterChannel).
*/
- protected void processStrategyCommit(RemoteFileProcessStrategy
remoteFileProcessStrategy, RemoteFileExchange exchange,
- RemoteFile remoteFile, boolean
failureHandled) {
+ protected void processStrategyCommit(RemoteFileProcessStrategy
processStrategy, RemoteFileExchange exchange,
+ RemoteFile file, boolean
failureHandled) {
+ if (endpoint.isIdempotent()) {
+ // only add to idempotent repository if we could process the file
+ // use file.getAbsoluteFileName as key for the idempotent
repository to support files with same name but in different folders
+
endpoint.getIdempotentRepository().add(file.getAbsolutelFileName());
+ }
+
try {
if (log.isDebugEnabled()) {
- log.debug("Committing remote file strategy: " +
remoteFileProcessStrategy + " for file: "
- + remoteFile + (failureHandled ? " that was handled by
the failure processor." : ""));
+ log.debug("Committing remote file strategy: " +
processStrategy + " for file: "
+ + file + (failureHandled ? " that was handled by the
failure processor." : ""));
}
- remoteFileProcessStrategy.commit(operations, endpoint, exchange,
remoteFile);
+ processStrategy.commit(operations, endpoint, exchange, file);
} catch (Exception e) {
- log.warn("Error committing remote file strategy: " +
remoteFileProcessStrategy, e);
+ log.warn("Error committing remote file strategy: " +
processStrategy, e);
handleException(e);
}
}
@@ -217,16 +223,16 @@
/**
* Strategy when the file was not processed and a rollback should be
executed.
*
- * @param remoteFileProcessStrategy the strategy to perform the commit
- * @param exchange the exchange
- * @param remoteFile the file processed
+ * @param processStrategy the strategy to perform the commit
+ * @param exchange the exchange
+ * @param file the file processed
*/
- protected void processStrategyRollback(RemoteFileProcessStrategy
remoteFileProcessStrategy, RemoteFileExchange exchange,
- RemoteFile remoteFile) {
+ protected void processStrategyRollback(RemoteFileProcessStrategy
processStrategy, RemoteFileExchange exchange,
+ RemoteFile file) {
if (log.isDebugEnabled()) {
- log.debug("Rolling back remote file strategy: " +
remoteFileProcessStrategy + " for file: " + remoteFile);
+ log.debug("Rolling back remote file strategy: " + processStrategy
+ " for file: " + file);
}
- remoteFileProcessStrategy.rollback(operations, endpoint, exchange,
remoteFile);
+ processStrategy.rollback(operations, endpoint, exchange, file);
}
/**
@@ -242,6 +248,10 @@
log.trace("Remote file did not match. Will skip this remote
file: " + file);
}
return false;
+ } else if (endpoint.isIdempotent() &&
endpoint.getIdempotentRepository().contains(file.getAbsolutelFileName())) {
+ // use file.getAbsoluteFileName as key for the idempotent
repository to support files with same name but in different folders
+ log.warn("RemoteFileConsumer is idempotent and the file has been
consumed before. Will skip this remote file: " + file);
+ return false;
}
// file matched
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java?rev=729714&r1=729713&r2=729714&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
Sun Dec 28 02:06:49 2008
@@ -28,6 +28,8 @@
import org.apache.camel.component.file.FileComponent;
import org.apache.camel.impl.ScheduledPollEndpoint;
import org.apache.camel.language.simple.FileLanguage;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.util.FactoryFinder;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.UuidGenerator;
@@ -41,9 +43,10 @@
private static final transient Log LOG =
LogFactory.getLog(RemoteFileEndpoint.class);
private static final transient String DEFAULT_STRATEGYFACTORY_CLASS =
"org.apache.camel.component.file.remote.strategy.RemoteFileProcessStrategyFactory";
+ private static final transient int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000;
- private RemoteFileProcessStrategy remoteFileProcessStrategy;
- private RemoteFileOperations remoteFileOperations;
+ private RemoteFileProcessStrategy processStrategy;
+ private RemoteFileOperations operations;
private RemoteFileConfiguration configuration;
private boolean noop;
private String tempPrefix;
@@ -59,6 +62,8 @@
private boolean delete;
private Expression expression;
private Expression preMoveExpression;
+ private boolean idempotent;
+ private IdempotentRepository idempotentRepository;
private RemoteFileFilter filter;
private Comparator<RemoteFile> sorter;
private Comparator<RemoteFileExchange> sortBy;
@@ -66,9 +71,9 @@
private String readLock = "none";
private long readLockTimeout;
- public RemoteFileEndpoint(String uri, RemoteFileComponent component,
RemoteFileOperations remoteFileOperations, RemoteFileConfiguration
configuration) {
+ public RemoteFileEndpoint(String uri, RemoteFileComponent component,
RemoteFileOperations operations, RemoteFileConfiguration configuration) {
super(uri, component);
- this.remoteFileOperations = remoteFileOperations;
+ this.operations = operations;
this.configuration = configuration;
}
@@ -81,18 +86,18 @@
}
public RemoteFileProducer createProducer() throws Exception {
- return new RemoteFileProducer(this, remoteFileOperations);
+ return new RemoteFileProducer(this, operations);
}
public RemoteFileConsumer createConsumer(Processor processor) throws
Exception {
String protocol = getConfiguration().getProtocol();
ObjectHelper.notEmpty(protocol, "protocol");
- RemoteFileConsumer consumer = null;
+ RemoteFileConsumer consumer;
if ("ftp".equals(protocol)) {
- consumer = new FtpConsumer(this, processor, remoteFileOperations);
+ consumer = new FtpConsumer(this, processor, operations);
} else if ("sftp".equals(protocol)) {
- consumer = new SftpConsumer(this, processor, remoteFileOperations);
+ consumer = new SftpConsumer(this, processor, operations);
} else {
throw new IllegalArgumentException("Unsupported protocol: " +
protocol);
}
@@ -101,6 +106,18 @@
throw new IllegalArgumentException("You cannot set delete=true and
a moveNamePrefix, moveNamePostfix or expression option");
}
+ // if noop=true then idempotent should also be configured
+ if (isNoop() && !isIdempotent()) {
+ LOG.info("Endpoint is configured with noop=true so forcing
endpoint to be idempotent as well");
+ setIdempotent(true);
+ }
+
+ // if idempotent and no repository set then create a default one
+ if (isIdempotent() && idempotentRepository == null) {
+ LOG.info("Using default memory based idempotent repository with
cache max size: " + DEFAULT_IDEMPOTENT_CACHE_SIZE);
+ idempotentRepository =
MemoryIdempotentRepository.memoryIdempotentRepository(DEFAULT_IDEMPOTENT_CACHE_SIZE);
+ }
+
configureConsumer(consumer);
return consumer;
}
@@ -121,15 +138,17 @@
}
public RemoteFileProcessStrategy getRemoteFileProcessStrategy() {
- if (remoteFileProcessStrategy == null) {
- remoteFileProcessStrategy = createRemoteFileStrategy();
- LOG.debug("Using remote file process strategy: " +
remoteFileProcessStrategy);
+ if (processStrategy == null) {
+ processStrategy = createRemoteFileStrategy();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using remote file process strategy: " +
processStrategy);
+ }
}
- return remoteFileProcessStrategy;
+ return processStrategy;
}
public void setRemoteFileProcessStrategy(RemoteFileProcessStrategy
remoteFileProcessStrategy) {
- this.remoteFileProcessStrategy = remoteFileProcessStrategy;
+ this.processStrategy = remoteFileProcessStrategy;
}
public boolean isNoop() {
@@ -250,6 +269,22 @@
this.preMoveExpression = FileLanguage.file(fileLanguageExpression);
}
+ public boolean isIdempotent() {
+ return idempotent;
+ }
+
+ public void setIdempotent(boolean idempotent) {
+ this.idempotent = idempotent;
+ }
+
+ public IdempotentRepository getIdempotentRepository() {
+ return idempotentRepository;
+ }
+
+ public void setIdempotentRepository(IdempotentRepository
idempotentRepository) {
+ this.idempotentRepository = idempotentRepository;
+ }
+
public RemoteFileFilter getFilter() {
return filter;
}
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java?rev=729714&r1=729713&r2=729714&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
Sun Dec 28 02:06:49 2008
@@ -35,13 +35,13 @@
public class RemoteFileProducer extends DefaultProducer {
private static final transient Log LOG =
LogFactory.getLog(RemoteFileProducer.class);
private RemoteFileEndpoint endpoint;
- private RemoteFileOperations ftp;
+ private RemoteFileOperations operations;
private boolean loggedIn;
- protected RemoteFileProducer(RemoteFileEndpoint endpoint,
RemoteFileOperations ftp) {
+ protected RemoteFileProducer(RemoteFileEndpoint endpoint,
RemoteFileOperations operations) {
super(endpoint);
this.endpoint = endpoint;
- this.ftp = ftp;
+ this.operations = operations;
}
public void process(Exchange exchange) throws Exception {
@@ -82,7 +82,7 @@
if (LOG.isTraceEnabled()) {
LOG.trace("Renaming file: " + tempTarget + " to: " +
target);
}
- boolean renamed = ftp.renameFile(tempTarget, target);
+ boolean renamed = operations.renameFile(tempTarget, target);
if (!renamed) {
throw new RemoteFileOperationFailedException("Cannot
rename file from: " + tempTarget + " to: " + target);
}
@@ -112,7 +112,7 @@
int lastPathIndex = fileName.lastIndexOf('/');
if (lastPathIndex != -1) {
String directory = fileName.substring(0, lastPathIndex);
- if (!ftp.buildDirectory(directory)) {
+ if (!operations.buildDirectory(directory)) {
LOG.warn("Couldn't build directory: " + directory + "
(could be because of denied permissions)");
}
}
@@ -122,7 +122,7 @@
LOG.trace("About to send: " + fileName + " to: " +
remoteServer() + " from exchange: " + exchange);
}
- boolean success = ftp.storeFile(fileName, payload);
+ boolean success = operations.storeFile(fileName, payload);
if (!success) {
throw new RemoteFileOperationFailedException("Error sending
file: " + fileName + " to: " + remoteServer());
}
@@ -208,11 +208,11 @@
}
protected void connectIfNecessary() throws IOException {
- if (!ftp.isConnected() || !loggedIn) {
+ if (!operations.isConnected() || !loggedIn) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not connected/logged in, connecting to " +
remoteServer());
}
- loggedIn = ftp.connect(endpoint.getConfiguration());
+ loggedIn = operations.connect(endpoint.getConfiguration());
if (!loggedIn) {
return;
}
@@ -225,7 +225,7 @@
if (LOG.isDebugEnabled()) {
LOG.debug("Disconnecting from " + remoteServer());
}
- ftp.disconnect();
+ operations.disconnect();
}
protected String remoteServer() {
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=729714&r1=729713&r2=729714&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
Sun Dec 28 02:06:49 2008
@@ -26,8 +26,8 @@
*/
public class SftpConsumer extends RemoteFileConsumer {
- public SftpConsumer(RemoteFileEndpoint endpoint, Processor processor,
RemoteFileOperations remoteFileOperations) {
- super(endpoint, processor, remoteFileOperations);
+ public SftpConsumer(RemoteFileEndpoint endpoint, Processor processor,
RemoteFileOperations operations) {
+ super(endpoint, processor, operations);
}
protected void pollDirectory(String fileName, boolean processDir,
List<RemoteFile> fileList) {
Modified:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java?rev=729714&r1=729713&r2=729714&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java
Sun Dec 28 02:06:49 2008
@@ -58,9 +58,8 @@
public void testNoop() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- // we should be able to poll the file more than once since its noop
- mock.expectedMinimumMessageCount(2);
- mock.setResultWaitTime(5000);
+ // we should not be able to poll the file more than once since its
noop and idempotent
+ mock.expectedMessageCount(1);
mock.assertIsSatisfied();
Added:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java?rev=729714&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
(added)
+++
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
Sun Dec 28 02:06:49 2008
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.file.FileComponent;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.spi.IdempotentRepository;
+
+/**
+ * Unit test for the idempotentRepository # option.
+ */
+public class FtpConsumerIdempotentRefTest extends FtpServerTestSupport {
+
+ private static boolean invoked;
+
+ private int port = 20078;
+ private String ftpUrl = "ftp://ad...@localhost:" + port
+ +
"/idempotent?password=admin&binary=false&idempotent=true&idempotentRepository=#myRepo&delete=true";
+
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("myRepo", new MyIdempotentRepository());
+ return jndi;
+ }
+
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from(ftpUrl).to("mock:result");
+ }
+ };
+ }
+
+ public void testIdempotent() throws Exception {
+ // consume the file the first time
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+ mock.expectedMessageCount(1);
+
+ template.sendBodyAndHeader(ftpUrl, "Hello World",
FileComponent.HEADER_FILE_NAME, "report.txt");
+
+ assertMockEndpointsSatisfied();
+
+ Thread.sleep(100);
+
+ // reset mock and set new expectations
+ mock.reset();
+ mock.expectedMessageCount(0);
+
+ // move file back
+ template.sendBodyAndHeader(ftpUrl, "Hello World",
FileComponent.HEADER_FILE_NAME, "report.txt");
+
+ // should NOT consume the file again, let 2 secs pass to let the
consumer try to consume it but it should not
+ Thread.sleep(2000);
+ assertMockEndpointsSatisfied();
+
+ assertTrue("MyIdempotentRepository should have been invoked", invoked);
+ }
+
+ public class MyIdempotentRepository implements
IdempotentRepository<String> {
+
+ public boolean add(String messageId) {
+ // will return true 1st time, and false 2nd time
+ boolean result = invoked;
+ invoked = true;
+ assertEquals("report.txt", messageId);
+ return !result;
+ }
+
+ public boolean contains(String key) {
+ return false;
+ }
+ }
+
+}
\ No newline at end of file
Copied:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java
(from r729480,
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java?p2=activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java&p1=activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java&r1=729480&r2=729714&rev=729714&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java
Sun Dec 28 02:06:49 2008
@@ -16,22 +16,18 @@
*/
package org.apache.camel.component.file.remote;
-import java.io.File;
-
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Producer;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.FileComponent;
import org.apache.camel.component.mock.MockEndpoint;
/**
- * Unit test to test noop option.
+ * Unit test for the idempotent=true option.
*/
-public class FromFtpNoopTest extends FtpServerTestSupport {
+public class FtpConsumerIdempotentTest extends FtpServerTestSupport {
- private int port = 20066;
- private String ftpUrl = "ftp://ad...@localhost:" + port +
"/noop?password=admin&binary=false&noop=true";
+ private int port = 20077;
+ private String ftpUrl = "ftp://ad...@localhost:" + port
+ +
"/idempotent?password=admin&binary=false&idempotent=true&delete=true";
public int getPort() {
return port;
@@ -40,36 +36,10 @@
@Override
protected void setUp() throws Exception {
super.setUp();
- prepareFtpServer();
- }
-
- private void prepareFtpServer() throws Exception {
- // prepares the FTP Server by creating a file on the server that we
want to unit
- // test that we can pool and store as a local file
- Endpoint endpoint = context.getEndpoint(ftpUrl);
- Exchange exchange = endpoint.createExchange();
- exchange.getIn().setBody("Hello World");
- exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME,
"hello.txt");
- Producer producer = endpoint.createProducer();
- producer.start();
- producer.process(exchange);
- producer.stop();
- }
-
- public void testNoop() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- // we should be able to poll the file more than once since its noop
- mock.expectedMinimumMessageCount(2);
- mock.setResultWaitTime(5000);
-
- mock.assertIsSatisfied();
-
- // assert the file is still there
- File file = new File("./res/home/noop/hello.txt");
- file = file.getAbsoluteFile();
- assertTrue("The file should exists", file.exists());
+ deleteDirectory("target/idempotent");
}
+ @Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
@@ -78,4 +48,28 @@
};
}
+ public void testIdempotent() throws Exception {
+ // consume the file the first time
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+ mock.expectedMessageCount(1);
+
+ template.sendBodyAndHeader(ftpUrl, "Hello World",
FileComponent.HEADER_FILE_NAME, "report.txt");
+
+ assertMockEndpointsSatisfied();
+
+ Thread.sleep(100);
+
+ // reset mock and set new expectations
+ mock.reset();
+ mock.expectedMessageCount(0);
+
+ // move file back
+ template.sendBodyAndHeader(ftpUrl, "Hello World",
FileComponent.HEADER_FILE_NAME, "report.txt");
+
+ // should NOT consume the file again, let 2 secs pass to let the
consumer try to consume it but it should not
+ Thread.sleep(2000);
+ assertMockEndpointsSatisfied();
+ }
+
}
\ No newline at end of file