Author: davsclaus
Date: Sun Dec 28 02:06:49 2008
New Revision: 729714

URL: http://svn.apache.org/viewvc?rev=729714&view=rev
Log:
CAMEL-1154: Added idempotent to camel-ftp

Added:
    
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
    
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java
      - copied, changed from r729480, 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java
Modified:
    
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
    
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
    
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
    
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
    
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java

Modified: 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=729714&r1=729713&r2=729714&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
 Sun Dec 28 02:06:49 2008
@@ -193,23 +193,29 @@
     /**
      * Strategy when the file was processed and a commit should be executed.
      *
-     * @param remoteFileProcessStrategy the strategy to perform the commit
-     * @param exchange                  the exchange
-     * @param remoteFile                the file processed
-     * @param failureHandled            is <tt>false</tt> if the exchange was 
processed succesfully, <tt>true</tt> if
-     *                                  an exception occured during processing 
but it was handled by the failure processor (usually the
-     *                                  DeadLetterChannel).
+     * @param processStrategy  the strategy to perform the commit
+     * @param exchange         the exchange
+     * @param file             the file processed
+     * @param failureHandled   is <tt>false</tt> if the exchange was processed 
succesfully, <tt>true</tt> if
+     *                         an exception occured during processing but it 
was handled by the failure processor (usually the
+     *                         DeadLetterChannel).
      */
-    protected void processStrategyCommit(RemoteFileProcessStrategy 
remoteFileProcessStrategy, RemoteFileExchange exchange,
-                                         RemoteFile remoteFile, boolean 
failureHandled) {
+    protected void processStrategyCommit(RemoteFileProcessStrategy 
processStrategy, RemoteFileExchange exchange,
+                                         RemoteFile file, boolean 
failureHandled) {
+        if (endpoint.isIdempotent()) {
+            // only add to idempotent repository if we could process the file
+            // use file.getAbsoluteFileName as key for the idempotent 
repository to support files with same name but in different folders
+            
endpoint.getIdempotentRepository().add(file.getAbsolutelFileName());
+        }
+
         try {
             if (log.isDebugEnabled()) {
-                log.debug("Committing remote file strategy: " + 
remoteFileProcessStrategy + " for file: "
-                        + remoteFile + (failureHandled ? " that was handled by 
the failure processor." : ""));
+                log.debug("Committing remote file strategy: " + 
processStrategy + " for file: "
+                        + file + (failureHandled ? " that was handled by the 
failure processor." : ""));
             }
-            remoteFileProcessStrategy.commit(operations, endpoint, exchange, 
remoteFile);
+            processStrategy.commit(operations, endpoint, exchange, file);
         } catch (Exception e) {
-            log.warn("Error committing remote file strategy: " + 
remoteFileProcessStrategy, e);
+            log.warn("Error committing remote file strategy: " + 
processStrategy, e);
             handleException(e);
         }
     }
@@ -217,16 +223,16 @@
     /**
      * Strategy when the file was not processed and a rollback should be 
executed.
      *
-     * @param remoteFileProcessStrategy the strategy to perform the commit
-     * @param exchange                  the exchange
-     * @param remoteFile                the file processed
+     * @param processStrategy  the strategy to perform the commit
+     * @param exchange         the exchange
+     * @param file             the file processed
      */
-    protected void processStrategyRollback(RemoteFileProcessStrategy 
remoteFileProcessStrategy, RemoteFileExchange exchange,
-                                           RemoteFile remoteFile) {
+    protected void processStrategyRollback(RemoteFileProcessStrategy 
processStrategy, RemoteFileExchange exchange,
+                                           RemoteFile file) {
         if (log.isDebugEnabled()) {
-            log.debug("Rolling back remote file strategy: " + 
remoteFileProcessStrategy + " for file: " + remoteFile);
+            log.debug("Rolling back remote file strategy: " + processStrategy 
+ " for file: " + file);
         }
-        remoteFileProcessStrategy.rollback(operations, endpoint, exchange, 
remoteFile);
+        processStrategy.rollback(operations, endpoint, exchange, file);
     }
 
     /**
@@ -242,6 +248,10 @@
                 log.trace("Remote file did not match. Will skip this remote 
file: " + file);
             }
             return false;
+        } else if (endpoint.isIdempotent() && 
endpoint.getIdempotentRepository().contains(file.getAbsolutelFileName())) {
+            // use file.getAbsoluteFileName as key for the idempotent 
repository to support files with same name but in different folders
+            log.warn("RemoteFileConsumer is idempotent and the file has been 
consumed before. Will skip this remote file: " + file);
+            return false;
         }
 
         // file matched

Modified: 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java?rev=729714&r1=729713&r2=729714&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
 Sun Dec 28 02:06:49 2008
@@ -28,6 +28,8 @@
 import org.apache.camel.component.file.FileComponent;
 import org.apache.camel.impl.ScheduledPollEndpoint;
 import org.apache.camel.language.simple.FileLanguage;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.util.FactoryFinder;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.UuidGenerator;
@@ -41,9 +43,10 @@
     private static final transient Log LOG = 
LogFactory.getLog(RemoteFileEndpoint.class);
     private static final transient String DEFAULT_STRATEGYFACTORY_CLASS =
             
"org.apache.camel.component.file.remote.strategy.RemoteFileProcessStrategyFactory";
+    private static final transient int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000;
 
-    private RemoteFileProcessStrategy remoteFileProcessStrategy;
-    private RemoteFileOperations remoteFileOperations;
+    private RemoteFileProcessStrategy processStrategy;
+    private RemoteFileOperations operations;
     private RemoteFileConfiguration configuration;
     private boolean noop;
     private String tempPrefix;
@@ -59,6 +62,8 @@
     private boolean delete;
     private Expression expression;
     private Expression preMoveExpression;
+    private boolean idempotent;
+    private IdempotentRepository idempotentRepository;
     private RemoteFileFilter filter;
     private Comparator<RemoteFile> sorter;
     private Comparator<RemoteFileExchange> sortBy;
@@ -66,9 +71,9 @@
     private String readLock = "none";
     private long readLockTimeout;
 
-    public RemoteFileEndpoint(String uri, RemoteFileComponent component, 
RemoteFileOperations remoteFileOperations, RemoteFileConfiguration 
configuration) {
+    public RemoteFileEndpoint(String uri, RemoteFileComponent component, 
RemoteFileOperations operations, RemoteFileConfiguration configuration) {
         super(uri, component);
-        this.remoteFileOperations = remoteFileOperations;
+        this.operations = operations;
         this.configuration = configuration;
     }
 
@@ -81,18 +86,18 @@
     }
 
     public RemoteFileProducer createProducer() throws Exception {
-        return new RemoteFileProducer(this, remoteFileOperations);
+        return new RemoteFileProducer(this, operations);
     }
 
     public RemoteFileConsumer createConsumer(Processor processor) throws 
Exception {
         String protocol = getConfiguration().getProtocol();
         ObjectHelper.notEmpty(protocol, "protocol");
 
-        RemoteFileConsumer consumer = null;
+        RemoteFileConsumer consumer;
         if ("ftp".equals(protocol)) {
-            consumer = new FtpConsumer(this, processor, remoteFileOperations);
+            consumer = new FtpConsumer(this, processor, operations);
         } else if ("sftp".equals(protocol)) {
-            consumer = new SftpConsumer(this, processor, remoteFileOperations);
+            consumer = new SftpConsumer(this, processor, operations);
         } else {
             throw new IllegalArgumentException("Unsupported protocol: " + 
protocol);
         }
@@ -101,6 +106,18 @@
             throw new IllegalArgumentException("You cannot set delete=true and 
a moveNamePrefix, moveNamePostfix or expression option");
         }
 
+        // if noop=true then idempotent should also be configured
+        if (isNoop() && !isIdempotent()) {
+            LOG.info("Endpoint is configured with noop=true so forcing 
endpoint to be idempotent as well");
+            setIdempotent(true);
+        }
+
+        // if idempotent and no repository set then create a default one
+        if (isIdempotent() && idempotentRepository == null) {
+            LOG.info("Using default memory based idempotent repository with 
cache max size: " + DEFAULT_IDEMPOTENT_CACHE_SIZE);
+            idempotentRepository = 
MemoryIdempotentRepository.memoryIdempotentRepository(DEFAULT_IDEMPOTENT_CACHE_SIZE);
+        }
+
         configureConsumer(consumer);
         return consumer;
     }
@@ -121,15 +138,17 @@
     }
 
     public RemoteFileProcessStrategy getRemoteFileProcessStrategy() {
-        if (remoteFileProcessStrategy == null) {
-            remoteFileProcessStrategy = createRemoteFileStrategy();
-            LOG.debug("Using remote file process strategy: " + 
remoteFileProcessStrategy);
+        if (processStrategy == null) {
+            processStrategy = createRemoteFileStrategy();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Using remote file process strategy: " + 
processStrategy);
+            }
         }
-        return remoteFileProcessStrategy;
+        return processStrategy;
     }
 
     public void setRemoteFileProcessStrategy(RemoteFileProcessStrategy 
remoteFileProcessStrategy) {
-        this.remoteFileProcessStrategy = remoteFileProcessStrategy;
+        this.processStrategy = remoteFileProcessStrategy;
     }
 
     public boolean isNoop() {
@@ -250,6 +269,22 @@
         this.preMoveExpression = FileLanguage.file(fileLanguageExpression);
     }
 
+    public boolean isIdempotent() {
+        return idempotent;
+    }
+
+    public void setIdempotent(boolean idempotent) {
+        this.idempotent = idempotent;
+    }
+
+    public IdempotentRepository getIdempotentRepository() {
+        return idempotentRepository;
+    }
+
+    public void setIdempotentRepository(IdempotentRepository 
idempotentRepository) {
+        this.idempotentRepository = idempotentRepository;
+    }
+
     public RemoteFileFilter getFilter() {
         return filter;
     }

Modified: 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java?rev=729714&r1=729713&r2=729714&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
 Sun Dec 28 02:06:49 2008
@@ -35,13 +35,13 @@
 public class RemoteFileProducer extends DefaultProducer {
     private static final transient Log LOG = 
LogFactory.getLog(RemoteFileProducer.class);
     private RemoteFileEndpoint endpoint;
-    private RemoteFileOperations ftp;
+    private RemoteFileOperations operations;
     private boolean loggedIn;
 
-    protected RemoteFileProducer(RemoteFileEndpoint endpoint, 
RemoteFileOperations ftp) {
+    protected RemoteFileProducer(RemoteFileEndpoint endpoint, 
RemoteFileOperations operations) {
         super(endpoint);
         this.endpoint = endpoint;
-        this.ftp = ftp;
+        this.operations = operations;
     }
 
     public void process(Exchange exchange) throws Exception {
@@ -82,7 +82,7 @@
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Renaming file: " + tempTarget + " to: " + 
target);
                 }
-                boolean renamed = ftp.renameFile(tempTarget, target);
+                boolean renamed = operations.renameFile(tempTarget, target);
                 if (!renamed) {
                     throw new RemoteFileOperationFailedException("Cannot 
rename file from: " + tempTarget + " to: " + target);
                 }
@@ -112,7 +112,7 @@
             int lastPathIndex = fileName.lastIndexOf('/');
             if (lastPathIndex != -1) {
                 String directory = fileName.substring(0, lastPathIndex);
-                if (!ftp.buildDirectory(directory)) {
+                if (!operations.buildDirectory(directory)) {
                     LOG.warn("Couldn't build directory: " + directory + " 
(could be because of denied permissions)");
                 }
             }
@@ -122,7 +122,7 @@
                 LOG.trace("About to send: " + fileName + " to: " + 
remoteServer() + " from exchange: " + exchange);
             }
 
-            boolean success = ftp.storeFile(fileName, payload);
+            boolean success = operations.storeFile(fileName, payload);
             if (!success) {
                 throw new RemoteFileOperationFailedException("Error sending 
file: " + fileName + " to: " + remoteServer());
             }
@@ -208,11 +208,11 @@
     }
 
     protected void connectIfNecessary() throws IOException {
-        if (!ftp.isConnected() || !loggedIn) {
+        if (!operations.isConnected() || !loggedIn) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Not connected/logged in, connecting to " + 
remoteServer());
             }
-            loggedIn = ftp.connect(endpoint.getConfiguration());
+            loggedIn = operations.connect(endpoint.getConfiguration());
             if (!loggedIn) {
                 return;
             }
@@ -225,7 +225,7 @@
         if (LOG.isDebugEnabled()) {
             LOG.debug("Disconnecting from " + remoteServer());
         }
-        ftp.disconnect();
+        operations.disconnect();
     }
 
     protected String remoteServer() {

Modified: 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=729714&r1=729713&r2=729714&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
 Sun Dec 28 02:06:49 2008
@@ -26,8 +26,8 @@
  */
 public class SftpConsumer extends RemoteFileConsumer {
 
-    public SftpConsumer(RemoteFileEndpoint endpoint, Processor processor, 
RemoteFileOperations remoteFileOperations) {
-        super(endpoint, processor, remoteFileOperations);
+    public SftpConsumer(RemoteFileEndpoint endpoint, Processor processor, 
RemoteFileOperations operations) {
+        super(endpoint, processor, operations);
     }
 
     protected void pollDirectory(String fileName, boolean processDir, 
List<RemoteFile> fileList) {

Modified: 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java?rev=729714&r1=729713&r2=729714&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java
 Sun Dec 28 02:06:49 2008
@@ -58,9 +58,8 @@
 
     public void testNoop() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        // we should be able to poll the file more than once since its noop
-        mock.expectedMinimumMessageCount(2);
-        mock.setResultWaitTime(5000);
+        // we should not be able to poll the file more than once since its 
noop and idempotent
+        mock.expectedMessageCount(1);
         
         mock.assertIsSatisfied();
 

Added: 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java?rev=729714&view=auto
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
 (added)
+++ 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentRefTest.java
 Sun Dec 28 02:06:49 2008
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.file.FileComponent;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.spi.IdempotentRepository;
+
+/**
+ * Unit test for the idempotentRepository # option.
+ */
+public class FtpConsumerIdempotentRefTest extends FtpServerTestSupport {
+
+    private static boolean invoked;
+
+    private int port = 20078;
+    private String ftpUrl = "ftp://ad...@localhost:"; + port
+            + 
"/idempotent?password=admin&binary=false&idempotent=true&idempotentRepository=#myRepo&delete=true";
+
+    public int getPort() {
+        return port;
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myRepo", new MyIdempotentRepository());
+        return jndi;
+    }
+
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from(ftpUrl).to("mock:result");
+            }
+        };
+    }
+
+    public void testIdempotent() throws Exception {
+        // consume the file the first time
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedMessageCount(1);
+
+        template.sendBodyAndHeader(ftpUrl, "Hello World", 
FileComponent.HEADER_FILE_NAME, "report.txt");
+
+        assertMockEndpointsSatisfied();
+
+        Thread.sleep(100);
+
+        // reset mock and set new expectations
+        mock.reset();
+        mock.expectedMessageCount(0);
+
+        // move file back
+        template.sendBodyAndHeader(ftpUrl, "Hello World", 
FileComponent.HEADER_FILE_NAME, "report.txt");
+
+        // should NOT consume the file again, let 2 secs pass to let the 
consumer try to consume it but it should not
+        Thread.sleep(2000);
+        assertMockEndpointsSatisfied();
+
+        assertTrue("MyIdempotentRepository should have been invoked", invoked);
+    }
+
+    public class MyIdempotentRepository implements 
IdempotentRepository<String> {
+
+        public boolean add(String messageId) {
+            // will return true 1st time, and false 2nd time
+            boolean result = invoked;
+            invoked = true;
+            assertEquals("report.txt", messageId);
+            return !result;
+        }
+
+        public boolean contains(String key) {
+            return false;
+        }
+    }
+
+}
\ No newline at end of file

Copied: 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java
 (from r729480, 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java)
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java?p2=activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java&p1=activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java&r1=729480&r2=729714&rev=729714&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoopTest.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpConsumerIdempotentTest.java
 Sun Dec 28 02:06:49 2008
@@ -16,22 +16,18 @@
  */
 package org.apache.camel.component.file.remote;
 
-import java.io.File;
-
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Producer;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.FileComponent;
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
- * Unit test to test noop option.
+ * Unit test for the idempotent=true option.
  */
-public class FromFtpNoopTest extends FtpServerTestSupport {
+public class FtpConsumerIdempotentTest extends FtpServerTestSupport {
 
-    private int port = 20066;
-    private String ftpUrl = "ftp://ad...@localhost:"; + port + 
"/noop?password=admin&binary=false&noop=true";
+    private int port = 20077;
+    private String ftpUrl = "ftp://ad...@localhost:"; + port
+            + 
"/idempotent?password=admin&binary=false&idempotent=true&delete=true";
 
     public int getPort() {
         return port;
@@ -40,36 +36,10 @@
     @Override
     protected void setUp() throws Exception {
         super.setUp();
-        prepareFtpServer();
-    }
-
-    private void prepareFtpServer() throws Exception {
-        // prepares the FTP Server by creating a file on the server that we 
want to unit
-        // test that we can pool and store as a local file
-        Endpoint endpoint = context.getEndpoint(ftpUrl);
-        Exchange exchange = endpoint.createExchange();
-        exchange.getIn().setBody("Hello World");
-        exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME, 
"hello.txt");
-        Producer producer = endpoint.createProducer();
-        producer.start();
-        producer.process(exchange);
-        producer.stop();
-    }
-
-    public void testNoop() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        // we should be able to poll the file more than once since its noop
-        mock.expectedMinimumMessageCount(2);
-        mock.setResultWaitTime(5000);
-        
-        mock.assertIsSatisfied();
-
-        // assert the file is still there
-        File file = new File("./res/home/noop/hello.txt");
-        file = file.getAbsoluteFile();
-        assertTrue("The file should exists", file.exists());
+        deleteDirectory("target/idempotent");
     }
 
+    @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
@@ -78,4 +48,28 @@
         };
     }
 
+    public void testIdempotent() throws Exception {
+        // consume the file the first time
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedMessageCount(1);
+
+        template.sendBodyAndHeader(ftpUrl, "Hello World", 
FileComponent.HEADER_FILE_NAME, "report.txt");
+
+        assertMockEndpointsSatisfied();
+
+        Thread.sleep(100);
+
+        // reset mock and set new expectations
+        mock.reset();
+        mock.expectedMessageCount(0);
+
+        // move file back
+        template.sendBodyAndHeader(ftpUrl, "Hello World", 
FileComponent.HEADER_FILE_NAME, "report.txt");
+
+        // should NOT consume the file again, let 2 secs pass to let the 
consumer try to consume it but it should not
+        Thread.sleep(2000);
+        assertMockEndpointsSatisfied();
+    }
+
 }
\ No newline at end of file


Reply via email to