Author: davsclaus
Date: Sun Jul 27 01:48:09 2008
New Revision: 680078

URL: http://svn.apache.org/viewvc?rev=680078&view=rev
Log:
Polished camel-ftp. Added missing unit test. Trying to close Apache FTP Server 
so Bamboo is happy. Better handling of exception thrown during ftp consumer if 
its stopping etc.

Added:
    
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpUtils.java
    
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java
      - copied, changed from r680012, 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
    
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpPollFileOnlyTest.java
   (contents, props changed)
      - copied, changed from r679955, 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToAsciiFileTest.java
Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
    
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
    
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
    
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperationFailedException.java
    
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpProducer.java
    
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileBinding.java
    
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConverter.java
    
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
    
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpProducer.java
    
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
    
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerBuildDirectoryTest.java
    
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java
    
activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
 Sun Jul 27 01:48:09 2008
@@ -172,7 +172,7 @@
         // file is still in progress of being written (slow writer)
         // TODO: Seems to not work on Unix boxes (see the unit test 
FileExclusiveReadTest)
         String originalName = file.getAbsolutePath();
-        File newName = new File(originalName + ".exclusiveRead");
+        File newName = new File(originalName + ".camelExclusiveRead");
         boolean exclusive = false;
         while (! exclusive) {
             exclusive = file.renameTo(newName);

Modified: 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
 Sun Jul 27 01:48:09 2008
@@ -22,6 +22,7 @@
 
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.component.file.FileComponent;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -55,17 +56,31 @@
         this.client = client;
     }
 
+    protected void doStart() throws Exception {
+        LOG.info("Starting");
+        super.doStart();
+    }
+
+    protected void doStop() throws Exception {
+        LOG.info("Stopping");
+        // disconnect when stopping
+        disconnect();
+        super.doStop();
+    }
+
     protected void connectIfNecessary() throws IOException {
         if (!client.isConnected()) {
-            LOG.debug("Not connected, trying to reconnect.");
-            endpoint.connect(client);
-            LOG.info("Connected to " + 
endpoint.getConfiguration().remoteServerInformation());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Not connected, connecting to " + remoteServer());
+            }
+            FtpUtils.connect(client, endpoint.getConfiguration());
+            LOG.info("Connected to " + remoteServer());
         }
     }
 
     protected void disconnect() throws IOException {
-        LOG.debug("Disconnecting from " + 
endpoint.getConfiguration().remoteServerInformation());
-        endpoint.disconnect(client);
+        LOG.debug("Disconnecting from " + remoteServer());
+        FtpUtils.disconnect(client);
     }
 
     protected void poll() throws Exception {
@@ -80,6 +95,7 @@
             if (endpoint.getConfiguration().isDirectory()) {
                 pollDirectory(fileName);
             } else {
+                // TODO: This code can be nicer
                 int index = fileName.lastIndexOf('/');
                 if (index > -1) {
                     client.changeWorkingDirectory(fileName.substring(0, 
index));
@@ -88,23 +104,25 @@
                 pollFile(files[0]);
             }
             lastPollTime = System.currentTimeMillis();
-        } catch (FTPConnectionClosedException e) {
-            // If the server disconnected us, then we must manually disconnect
-            // the client before attempting to reconnect
-            LOG.warn("Disconnecting due to exception: " + e.getMessage());
-            disconnect();
-            // Rethrow to signify that we didn't poll
-            throw e;
-        } catch (RuntimeCamelException e) {
-            LOG.warn("Caught RuntimeCamelException: " + e.getMessage(), e);
-            LOG.warn("Hoping an explicit disconnect/reconnect will solve the 
problem");
-            disconnect();
-            // Rethrow to signify that we didn't poll
-            throw e;
+        } catch (Exception e) {
+            if (isStopping() || isStopped()) {
+                // if we are stopping then ignore any exception during a poll
+                LOG.warn( "Consumer is stopping. Ignoring caught exception: " +
+                    e.getClass().getCanonicalName() + " message: " + 
e.getMessage());
+            } else {
+                LOG.warn("Exception occured during polling: " +
+                    e.getClass().getCanonicalName() + " message: " + 
e.getMessage());
+                disconnect();
+                // Rethrow to signify that we didn't poll
+                throw e;
+            }
         }
     }
 
     protected void pollDirectory(String dir) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Polling directory: " + dir);
+        }
         String currentDir = client.printWorkingDirectory();
 
         client.changeWorkingDirectory(dir);
@@ -136,7 +154,6 @@
         long ts = ftpFile.getTimestamp().getTimeInMillis();
         // TODO do we need to adjust the TZ? can we?
         if (ts > lastPollTime && isMatched(ftpFile)) {
-            String remoteServer =  
endpoint.getConfiguration().remoteServerInformation();
             String fullFileName = getFullFileName(ftpFile);
 
             // is we use excluse read then acquire the exclusive read (waiting 
until we got it)
@@ -148,7 +165,7 @@
             final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
             client.retrieveFile(ftpFile.getName(), byteArrayOutputStream);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Retrieved file: " + ftpFile.getName() + " from: " + 
remoteServer);
+                LOG.debug("Retrieved file: " + ftpFile.getName() + " from: " + 
remoteServer());
             }
 
             RemoteFileExchange exchange = 
endpoint.createExchange(fullFileName, byteArrayOutputStream);
@@ -168,12 +185,12 @@
             if (deleteFile) {
                 // delete file after consuming
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Deleteing file: " + ftpFile.getName() + " from: 
" + remoteServer);
+                    LOG.debug("Deleteing file: " + ftpFile.getName() + " from: 
" + remoteServer());
                 }
                 boolean deleted = client.deleteFile(ftpFile.getName());
                 if (!deleted) {
                     // ignore just log a warning
-                    LOG.warn("Could not delete file: " + ftpFile.getName() + " 
from: " + remoteServer);
+                    LOG.warn("Could not delete file: " + ftpFile.getName() + " 
from: " + remoteServer());
                 }
             }
 
@@ -187,7 +204,7 @@
         // the trick is to try to rename the file, if we can rename then we 
have exclusive read
         // since its a remote file we can not use java.nio to get a RW access
         String originalName = ftpFile.getName();
-        String newName = originalName + ".camel";
+        String newName = originalName + ".camelExclusiveRead";
         boolean exclusive = false;
         while (! exclusive) {
             exclusive = client.rename(originalName, newName);
@@ -208,6 +225,10 @@
         }
     }
 
+    private String remoteServer() {
+        return endpoint.getConfiguration().remoteServerInformation();
+    }
+
     protected boolean isMatched(FTPFile file) {
         boolean result = true;
         if (regexPattern != null && regexPattern.length() > 0) {

Modified: 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
 Sun Jul 27 01:48:09 2008
@@ -16,12 +16,9 @@
  */
 package org.apache.camel.component.file.remote;
 
-import java.io.IOException;
-
 import org.apache.camel.Processor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.net.ftp.FTPClient;
 
 public class FtpEndpoint extends RemoteFileEndpoint<RemoteFileExchange> {
     private static final transient Log LOG = 
LogFactory.getLog(FtpEndpoint.class);
@@ -35,37 +32,13 @@
     }
 
     public FtpProducer createProducer() throws Exception {
-        return new FtpProducer(this, createFtpClient());
+        return new FtpProducer(this, FtpUtils.createNewFtpClient());
     }
 
     public FtpConsumer createConsumer(Processor processor) throws Exception {
-        final FtpConsumer consumer = new FtpConsumer(this, processor, 
createFtpClient());
+        final FtpConsumer consumer = new FtpConsumer(this, processor, 
FtpUtils.createNewFtpClient());
         configureConsumer(consumer);
         return consumer;
     }
 
-    protected FTPClient createFtpClient() {
-        return new FTPClient();
-    }
-
-    public void connect(FTPClient client) throws IOException {
-        // TODO: connect and disconnect. createFtpClient should be moved to 
another class they don't
-        // belong on this endpoint class that is only for Camel related stuff 
-        RemoteFileConfiguration config = getConfiguration();
-        String host = config.getHost();
-        int port = config.getPort();
-        String username = config.getUsername();
-
-        client.connect(host, port);
-        if (username != null) {
-            client.login(username, config.getPassword());
-        } else {
-            client.login("anonymous", null);
-        }
-        client.setFileType(config.isBinary() ? FTPClient.BINARY_FILE_TYPE : 
FTPClient.ASCII_FILE_TYPE);
-    }
-
-    public void disconnect(FTPClient client) throws IOException {
-        client.disconnect();
-    }
 }

Modified: 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperationFailedException.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperationFailedException.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperationFailedException.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperationFailedException.java
 Sun Jul 27 01:48:09 2008
@@ -20,13 +20,14 @@
 
 /**
  * @version $Revision$
+ * @deprecated not used will be removed in Camel 2.0.
  */
 public class FtpOperationFailedException extends RuntimeCamelException {
     private final int code;
     private final String reason;
 
     public FtpOperationFailedException(int code, String reason) {
-        super("Ftp Operation failed: " + reason.trim() + " Code: " + code);
+        super("Ftp operation failed: " + reason + " Code: " + code);
         this.code = code;
         this.reason = reason;
     }

Modified: 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpProducer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpProducer.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpProducer.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpProducer.java
 Sun Jul 27 01:48:09 2008
@@ -39,44 +39,49 @@
     }
 
     public void process(Exchange exchange) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Processing " + endpoint.getConfiguration());
+        }
         connectIfNecessary();
         // If the attempt to connect isn't successful, then the thrown
         // exception will signify that we couldn't deliver
         try {
             process(endpoint.createExchange(exchange));
-        } catch (FTPConnectionClosedException e) {
-            // If the server disconnected us, then we must manually disconnect
-            // the client before attempting to reconnect
-            LOG.warn("Disconnecting due to exception: " + e.getMessage());
-            disconnect();
-            // Rethrow to signify that we didn't deliver
-            throw e;
-        } catch (RuntimeCamelException e) {
-            LOG.warn("Caught RuntimeCamelException: " + e.getMessage(), e);
-            LOG.warn("Hoping an explicit disconnect/reconnect will solve the 
problem");
-            disconnect();
-            // Rethrow to signify that we didn't deliver
-            throw e;
+        } catch (Exception e) {
+            if (isStopping() || isStopped()) {
+                // if we are stopping then ignore any exception during a poll
+                LOG.warn( "Producer is stopping. Ignoring caught exception: " +
+                    e.getClass().getCanonicalName() + " message: " + 
e.getMessage());
+            } else {
+                LOG.warn("Exception occured during processing: " +
+                    e.getClass().getCanonicalName() + " message: " + 
e.getMessage());
+                disconnect();
+                // Rethrow to signify that we didn't poll
+                throw e;
+            }
         }
     }
 
     protected void connectIfNecessary() throws IOException {
         if (!client.isConnected()) {
-            LOG.debug("Not connected, trying to reconnect.");
-            endpoint.connect(client);
-            LOG.info("Connected to " + 
endpoint.getConfiguration().remoteServerInformation());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Not connected, connecting to " + remoteServer());
+            }
+            FtpUtils.connect(client, endpoint.getConfiguration());
+            LOG.info("Connected to " + remoteServer());
         }
     }
 
     public void disconnect() throws IOException {
-        LOG.debug("Disconnecting from " + 
endpoint.getConfiguration().remoteServerInformation());
-        endpoint.disconnect(client);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Disconnecting from " + remoteServer());
+        }
+        FtpUtils.disconnect(client);
     }
 
     public void process(RemoteFileExchange exchange) throws Exception {
         InputStream payload = exchange.getIn().getBody(InputStream.class);
         try {
-            String remoteServer = 
endpoint.getConfiguration().remoteServerInformation();
             String fileName = createFileName(exchange.getIn(), 
endpoint.getConfiguration());
 
             int lastPathIndex = fileName.lastIndexOf('/');
@@ -89,12 +94,10 @@
 
             boolean success = client.storeFile(fileName, payload);
             if (!success) {
-                throw new RuntimeCamelException("Error sending file: " + 
fileName + " to: " + remoteServer);
+                throw new RuntimeCamelException("Error sending file: " + 
fileName + " to: " + remoteServer());
             }
 
-            if (LOG.isInfoEnabled()) {
-                LOG.info("Sent: " + fileName + " to: " + remoteServer);
-            }
+            LOG.info("Sent: " + fileName + " to: " + remoteServer());
         } finally {
             if (payload != null) {
                 payload.close();
@@ -105,11 +108,8 @@
     @Override
     protected void doStart() throws Exception {
         LOG.info("Starting");
-        try {
-            connectIfNecessary();
-        } catch (IOException e) {
-            LOG.warn("Couldn't connect to: " + 
endpoint.getConfiguration().remoteServerInformation());
-        }
+        // do not connect when componet starts, just wait until we process as 
we will
+        // connect at that time if needed
         super.doStart();
     }
 
@@ -141,4 +141,8 @@
         return success;
     }
 
+    private String remoteServer() {
+        return endpoint.getConfiguration().remoteServerInformation();
+    }
+
 }

Added: 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpUtils.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpUtils.java?rev=680078&view=auto
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpUtils.java
 (added)
+++ 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpUtils.java
 Sun Jul 27 01:48:09 2008
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.IOException;
+
+import org.apache.commons.net.ftp.FTPClient;
+
+/**
+ * Utility methods for FTP.
+ */
+public class FtpUtils {
+
+    private FtpUtils() {
+    }
+
+    public static void connect(FTPClient client, RemoteFileConfiguration 
config) throws IOException {
+        String host = config.getHost();
+        int port = config.getPort();
+        String username = config.getUsername();
+
+        client.connect(host, port);
+        if (username != null) {
+            client.login(username, config.getPassword());
+        } else {
+            client.login("anonymous", null);
+        }
+        client.setFileType(config.isBinary() ? FTPClient.BINARY_FILE_TYPE : 
FTPClient.ASCII_FILE_TYPE);
+    }
+
+    public static void disconnect(FTPClient client) throws IOException {
+        if (client.isConnected()) {
+            client.disconnect();
+        }
+    }
+
+    public static FTPClient createNewFtpClient() {
+        return new FTPClient();
+    }
+
+}

Modified: 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileBinding.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileBinding.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileBinding.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileBinding.java
 Sun Jul 27 01:48:09 2008
@@ -19,6 +19,7 @@
 import java.io.OutputStream;
 
 public class RemoteFileBinding {
+
     public Object extractBodyFromOutputStream(RemoteFileExchange exchange, 
OutputStream outputStream) {
         return outputStream;
     }

Modified: 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConverter.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConverter.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConverter.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConverter.java
 Sun Jul 27 01:48:09 2008
@@ -29,6 +29,7 @@
  */
 @Converter
 public final class RemoteFileConverter {
+    
     private RemoteFileConverter() {
         // Helper Class
     }

Modified: 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
 Sun Jul 27 01:48:09 2008
@@ -55,6 +55,18 @@
         this.session = session;
     }
 
+    protected void doStart() throws Exception {
+        LOG.info("Starting");
+        super.doStart();
+    }
+
+    protected void doStop() throws Exception {
+        LOG.info("Stopping");
+        // disconnect when stopping
+        disconnect();
+        super.doStop();
+    }
+
     protected void connectIfNecessary() throws JSchException {
         if (channel == null || !channel.isConnected()) {
             if (session == null || !session.isConnected()) {
@@ -65,17 +77,18 @@
             LOG.debug("Channel isn't connected, trying to recreate and 
connect.");
             channel = endpoint.createChannelSftp(session);
             channel.connect();
-            LOG.info("Connected to " + 
endpoint.getConfiguration().remoteServerInformation());
+            LOG.info("Connected to " + remoteServer());
         }
     }
 
     protected void disconnect() throws JSchException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Disconnecting from " + remoteServer());
+        }
         if (session != null) {
-            LOG.debug("Session is being explicitly disconnected");
             session.disconnect();
         }
         if (channel != null) {
-            LOG.debug("Channel is being explicitly disconnected");
             channel.disconnect();
         }
     }
@@ -92,29 +105,31 @@
             if (endpoint.getConfiguration().isDirectory()) {
                 pollDirectory(fileName);
             } else {
+                // TODO: This code could be neater
                 channel.cd(fileName.substring(0, fileName.lastIndexOf('/')));
                 final ChannelSftp.LsEntry file = 
(ChannelSftp.LsEntry)channel.ls(fileName.substring(fileName.lastIndexOf('/') + 
1)).get(0);
                 pollFile(file);
             }
             lastPollTime = System.currentTimeMillis();
-        } catch (JSchException e) {
-            // If the connection has gone stale, then we must manually 
disconnect
-            // the client before attempting to reconnect
-            LOG.warn("Disconnecting due to exception: " + e.getMessage());
-            disconnect();
-            // Rethrow to signify that we didn't poll
-            throw e;
-        } catch (SftpException e) {
-            // Still not sure if/when these come up and what we should do 
about them
-            // client.disconnect();
-            LOG.warn("Caught SftpException:" + e.getMessage(), e);
-            LOG.warn("Hoping an explicit disconnect/reconnect will solve the 
problem");
-            // Rethrow to signify that we didn't poll
-            throw e;
+        } catch (Exception e) {
+            if (isStopping() || isStopped()) {
+                // if we are stopping then ignore any exception during a poll
+                LOG.warn( "Consumer is stopping. Ignoring caught exception: " +
+                    e.getClass().getCanonicalName() + " message: " + 
e.getMessage());
+            } else {
+                LOG.warn("Exception occured during polling: " +
+                    e.getClass().getCanonicalName() + " message: " + 
e.getMessage());
+                disconnect();
+                // Rethrow to signify that we didn't poll
+                throw e;
+            }
         }
     }
 
     protected void pollDirectory(String dir) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Polling directory: " + dir);
+        }
         String currentDir = channel.pwd();
 
         channel.cd(dir);
@@ -147,7 +162,7 @@
 
         // TODO do we need to adjust the TZ? can we?
         if (ts > lastPollTime && isMatched(sftpFile)) {
-            String remoteServer =  
endpoint.getConfiguration().remoteServerInformation();
+            String fullFileName = getFullFileName(sftpFile);
 
             // is we use excluse read then acquire the exclusive read (waiting 
until we got it)
             if (exclusiveRead) {
@@ -158,15 +173,18 @@
             final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
             channel.get(sftpFile.getFilename(), byteArrayOutputStream);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Retrieved file: " + sftpFile.getFilename() + " 
from: " + remoteServer);
+                LOG.debug("Retrieved file: " + sftpFile.getFilename() + " 
from: " + remoteServer());
             }
 
             RemoteFileExchange exchange = 
endpoint.createExchange(getFullFileName(sftpFile), byteArrayOutputStream);
 
             if (isSetNames()) {
-                String relativePath = 
getFullFileName(sftpFile).substring(endpoint.getConfiguration().getFile().length());
-                if (relativePath.startsWith("/")) {
-                    relativePath = relativePath.substring(1);
+                String ftpBasePath = endpoint.getConfiguration().getFile();
+                String relativePath = 
fullFileName.substring(ftpBasePath.length() + 1);
+                relativePath = relativePath.replaceFirst("/", "");
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Setting exchange filename to " + relativePath);
                 }
                 exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME, 
relativePath);
             }
@@ -174,13 +192,13 @@
             if (deleteFile) {
                 // delete file after consuming
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Deleteing file: " + sftpFile.getFilename() + " 
from: " + remoteServer);
+                    LOG.debug("Deleteing file: " + sftpFile.getFilename() + " 
from: " + remoteServer());
                 }
                 try {
                     channel.rm(sftpFile.getFilename());
                 } catch (SftpException e) {
                     // ignore just log a warning
-                    LOG.warn("Could not delete file: " + 
sftpFile.getFilename() + " from: " + remoteServer);
+                    LOG.warn("Could not delete file: " + 
sftpFile.getFilename() + " from: " + remoteServer());
                 }
             }
 
@@ -194,7 +212,7 @@
         // the trick is to try to rename the file, if we can rename then we 
have exclusive read
         // since its a remote file we can not use java.nio to get a RW access
         String originalName = sftpFile.getFilename();
-        String newName = originalName + ".camel";
+        String newName = originalName + ".camelExclusiveRead";
         boolean exclusive = false;
         while (! exclusive) {
             try {
@@ -221,6 +239,10 @@
         }
     }
 
+    private String remoteServer() {
+        return endpoint.getConfiguration().remoteServerInformation();
+    }
+
     protected boolean isMatched(ChannelSftp.LsEntry sftpFile) {
         boolean result = true;
         if (regexPattern != null && regexPattern.length() > 0) {

Modified: 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpProducer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpProducer.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpProducer.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpProducer.java
 Sun Jul 27 01:48:09 2008
@@ -41,6 +41,30 @@
         this.session = session;
     }
 
+    public void process(Exchange exchange) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Processing " + endpoint.getConfiguration());
+        }
+        connectIfNecessary();
+        // If the attempt to connect isn't successful, then the thrown
+        // exception will signify that we couldn't deliver
+        try {
+            process(endpoint.createExchange(exchange));
+        } catch (Exception e) {
+            if (isStopping() || isStopped()) {
+                // if we are stopping then ignore any exception during a poll
+                LOG.warn( "Producer is stopping. Ignoring caught exception: " +
+                    e.getClass().getCanonicalName() + " message: " + 
e.getMessage());
+            } else {
+                LOG.warn("Exception occured during processing: " +
+                    e.getClass().getCanonicalName() + " message: " + 
e.getMessage());
+                disconnect();
+                // Rethrow to signify that we didn't poll
+                throw e;
+            }
+        }
+    }
+
     protected void connectIfNecessary() throws JSchException {
         if (channel == null || !channel.isConnected()) {
             if (session == null || !session.isConnected()) {
@@ -56,40 +80,17 @@
     }
 
     protected void disconnect() throws JSchException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Disconnecting from " + remoteServer());
+        }
         if (session != null) {
-            LOG.debug("Session is being explicitly disconnected");
             session.disconnect();
         }
         if (channel != null) {
-            LOG.debug("Channel is being explicitly disconnected");
             channel.disconnect();
         }
     }
 
-    public void process(Exchange exchange) throws Exception {
-        connectIfNecessary();
-        // If the attempt to connect isn't successful, then the thrown
-        // exception will signify that we couldn't deliver
-        try {
-            process(endpoint.createExchange(exchange));
-        } catch (JSchException e) {
-            // If the connection has gone stale, then we must manually 
disconnect
-            // the client before attempting to reconnect
-            LOG.warn("Disconnecting due to exception: " + e.getMessage());
-            disconnect();
-            // Rethrow to signify that we didn't deliver
-            throw e;
-        } catch (SftpException e) {
-            // Still not sure if/when these come up and what we should do 
about them
-            // client.disconnect();
-            LOG.warn("Caught SftpException:" + e.getMessage(), e);
-            LOG.warn("Hoping an explicit disconnect/reconnect will solve the 
problem");
-            disconnect();
-            // Rethrow to signify that we didn't deliver
-            throw e;
-        }
-    }
-
     public void process(RemoteFileExchange exchange) throws Exception {
         InputStream payload = exchange.getIn().getBody(InputStream.class);
         try {
@@ -107,9 +108,7 @@
 
             channel.put(payload, fileName);
 
-            if (LOG.isInfoEnabled()) {
-                LOG.info("Sent: " + fileName + " to: " + remoteServer);
-            }
+            LOG.info("Sent: " + fileName + " to: " + remoteServer);
         } finally {
             if (payload != null) {
                 payload.close();
@@ -120,11 +119,8 @@
     @Override
     protected void doStart() throws Exception {
         LOG.info("Starting");
-        try {
-            connectIfNecessary();
-        } catch (JSchException e) {
-            LOG.warn("Couldn't connect to: " + 
endpoint.getConfiguration().remoteServerInformation());
-        }
+        // do not connect when componet starts, just wait until we process as 
we will
+        // connect at that time if needed
         super.doStart();
     }
 
@@ -164,4 +160,8 @@
         return success;
     }
 
+    private String remoteServer() {
+        return endpoint.getConfiguration().remoteServerInformation();
+    }
+
 }
\ No newline at end of file

Modified: 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
 Sun Jul 27 01:48:09 2008
@@ -31,7 +31,7 @@
 
     private static final Log LOG = 
LogFactory.getLog(FromFtpExclusiveReadTest.class);
 
-    private String port = "20019";
+    private String port = "20025";
     private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port + 
"/slowfile?password=admin&binary=false&consumer.exclusiveRead=true&consumer.delay=500";
 
     public String getPort() {

Copied: 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java
 (from r680012, 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java)
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java?p2=activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java&p1=activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java&r1=680012&r2=680078&rev=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java
 Sun Jul 27 01:48:09 2008
@@ -25,14 +25,14 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * Unit test to verify exclusive read - that we do not poll files that is in 
progress of being written.
+ * Unit test to verify *NON* exclusive read.
  */
-public class FromFtpExclusiveReadTest extends FtpServerTestSupport {
+public class FromFtpNonExclusiveReadTest extends FtpServerTestSupport {
 
     private static final Log LOG = 
LogFactory.getLog(FromFtpExclusiveReadTest.class);
 
-    private String port = "20019";
-    private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port + 
"/slowfile?password=admin&binary=false&consumer.exclusiveRead=true&consumer.delay=500";
+    private String port = "20027";
+    private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port + 
"/slowfile?password=admin&binary=false&consumer.exclusiveRead=false&consumer.delay=500";
 
     public String getPort() {
         return port;
@@ -43,11 +43,15 @@
         createDirectory("./res/home/slowfile");
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
-        mock.expectedBodiesReceived("Hello WorldLine #0Line #1Line #2Bye 
World");
 
         createSlowFile();
 
         mock.assertIsSatisfied();
+
+        // we read only part of the file as we dont have exclusive read and 
thus read part of the
+        // file currently in progress of being written - so we get only the 
Hello World part
+        String body = mock.getExchanges().get(0).getIn().getBody(String.class);
+        assertFalse("Should not get the entire file", body.endsWith("Bye 
World"));
     }
 
     private void createSlowFile() throws Exception {
@@ -77,4 +81,4 @@
         File file = new File(s);
         file.mkdirs();
     }
-}
+}
\ No newline at end of file

Copied: 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpPollFileOnlyTest.java
 (from r679955, 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToAsciiFileTest.java)
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpPollFileOnlyTest.java?p2=activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpPollFileOnlyTest.java&p1=activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToAsciiFileTest.java&r1=679955&r2=680078&rev=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToAsciiFileTest.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpPollFileOnlyTest.java
 Sun Jul 27 01:48:09 2008
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.file.remote;
 
-import java.io.File;
-
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Producer;
@@ -26,29 +24,19 @@
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
- * Unit test to verify that we can pool an ASCII file from the FTP Server and 
store it on a local file path
+ * Unit test to poll a file from the FTP server and not a folder as most test 
is.
  */
-public class FromFtpToAsciiFileTest extends FtpServerTestSupport {
+public class FromFtpPollFileOnlyTest extends FtpServerTestSupport {
 
-    private String port = "20013";
-    private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port + 
"/tmp3/camel?password=admin&binary=false";
+    private String port = "20028";
+    private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port + 
"/fileonly/report.txt?password=admin&directory=false";
 
-    public void testFtpRoute() throws Exception {
-        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
-        resultEndpoint.expectedMinimumMessageCount(1);
-        resultEndpoint.expectedBodiesReceived("Hello World from FTPServer");
-        resultEndpoint.assertIsSatisfied();
-
-        // wait until the file producer has written the file
-        Thread.sleep(1000);
-
-        // assert the file
-        File file = new File("target/ftptest/deleteme.txt");
-        assertTrue("The ASCII file should exists", file.exists());
-        assertTrue("File size wrong", file.length() > 10);
+    public void testPollFileOnly() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(1);
+        mock.expectedBodiesReceived("Hello World from FTPServer");
 
-        // let some time pass to let the consumer etc. properly do its 
business before closing
-        Thread.sleep(1000);
+        mock.assertIsSatisfied();
     }
 
     public String getPort() {
@@ -64,10 +52,10 @@
     private void prepareFtpServer() throws Exception {
         // prepares the FTP Server by creating a file on the server that we 
want to unit
         // test that we can pool and store as a local file
-        Endpoint endpoint = context.getEndpoint(ftpUrl);
+        Endpoint endpoint = context.getEndpoint("ftp://[EMAIL PROTECTED]:" + 
port + "/fileonly/?password=admin&binary=false");
         Exchange exchange = endpoint.createExchange();
         exchange.getIn().setBody("Hello World from FTPServer");
-        exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME, 
"hello.txt");
+        exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME, 
"report.txt");
         Producer producer = endpoint.createProducer();
         producer.start();
         producer.process(exchange);
@@ -77,12 +65,9 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                String fileUrl = "file:target/ftptest/?append=false&noop=true";
-                from(ftpUrl).setHeader(FileComponent.HEADER_FILE_NAME, 
constant("deleteme.txt")).
-                        
convertBodyTo(String.class).to(fileUrl).to("mock:result");
+                from(ftpUrl).to("mock:result");
             }
         };
     }
 
-}
-
+}
\ No newline at end of file

Propchange: 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpPollFileOnlyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpPollFileOnlyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerBuildDirectoryTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerBuildDirectoryTest.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerBuildDirectoryTest.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerBuildDirectoryTest.java
 Sun Jul 27 01:48:09 2008
@@ -27,7 +27,7 @@
  */
 public class FtpProducerBuildDirectoryTest extends FtpServerTestSupport {
 
-    private String port = "20018";
+    private String port = "20026";
     private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port + 
"/upload/user/claus?binary=false&password=admin";
 
     public String getPort() {

Modified: 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java
 (original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java
 Sun Jul 27 01:48:09 2008
@@ -42,6 +42,7 @@
     protected void tearDown() throws Exception {
         super.tearDown();
         // must stop server after super to let the clients stop correctly 
(CAMEL-444)
+        ftpServer.getServerContext().dispose();
         ftpServer.stop();
     }
 

Modified: 
activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties 
(original)
+++ 
activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties 
Sun Jul 27 01:48:09 2008
@@ -21,7 +21,7 @@
 log4j.rootLogger=INFO, file
 
 # uncomment the following to enable camel debugging
-log4j.logger.org.apache.camel.component.file=DEBUG
+log4j.logger.org.apache.camel.component.file=TRACE
 log4j.logger.org.apache.mina=WARN
 log4j.logger.org.apache.ftpserver=WARN
 


Reply via email to