Author: davsclaus
Date: Sun Jul 27 01:48:09 2008
New Revision: 680078
URL: http://svn.apache.org/viewvc?rev=680078&view=rev
Log:
Polished camel-ftp. Added missing unit test. Trying to close Apache FTP Server
so Bamboo is happy. Better handling of exception thrown during ftp consumer if
its stopping etc.
Added:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpUtils.java
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java
- copied, changed from r680012,
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpPollFileOnlyTest.java
(contents, props changed)
- copied, changed from r679955,
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToAsciiFileTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperationFailedException.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpProducer.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileBinding.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConverter.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpProducer.java
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerBuildDirectoryTest.java
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java
activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
Sun Jul 27 01:48:09 2008
@@ -172,7 +172,7 @@
// file is still in progress of being written (slow writer)
// TODO: Seems to not work on Unix boxes (see the unit test
FileExclusiveReadTest)
String originalName = file.getAbsolutePath();
- File newName = new File(originalName + ".exclusiveRead");
+ File newName = new File(originalName + ".camelExclusiveRead");
boolean exclusive = false;
while (! exclusive) {
exclusive = file.renameTo(newName);
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
Sun Jul 27 01:48:09 2008
@@ -22,6 +22,7 @@
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.util.ObjectHelper;
import org.apache.camel.component.file.FileComponent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -55,17 +56,31 @@
this.client = client;
}
+ protected void doStart() throws Exception {
+ LOG.info("Starting");
+ super.doStart();
+ }
+
+ protected void doStop() throws Exception {
+ LOG.info("Stopping");
+ // disconnect when stopping
+ disconnect();
+ super.doStop();
+ }
+
protected void connectIfNecessary() throws IOException {
if (!client.isConnected()) {
- LOG.debug("Not connected, trying to reconnect.");
- endpoint.connect(client);
- LOG.info("Connected to " +
endpoint.getConfiguration().remoteServerInformation());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not connected, connecting to " + remoteServer());
+ }
+ FtpUtils.connect(client, endpoint.getConfiguration());
+ LOG.info("Connected to " + remoteServer());
}
}
protected void disconnect() throws IOException {
- LOG.debug("Disconnecting from " +
endpoint.getConfiguration().remoteServerInformation());
- endpoint.disconnect(client);
+ LOG.debug("Disconnecting from " + remoteServer());
+ FtpUtils.disconnect(client);
}
protected void poll() throws Exception {
@@ -80,6 +95,7 @@
if (endpoint.getConfiguration().isDirectory()) {
pollDirectory(fileName);
} else {
+ // TODO: This code can be nicer
int index = fileName.lastIndexOf('/');
if (index > -1) {
client.changeWorkingDirectory(fileName.substring(0,
index));
@@ -88,23 +104,25 @@
pollFile(files[0]);
}
lastPollTime = System.currentTimeMillis();
- } catch (FTPConnectionClosedException e) {
- // If the server disconnected us, then we must manually disconnect
- // the client before attempting to reconnect
- LOG.warn("Disconnecting due to exception: " + e.getMessage());
- disconnect();
- // Rethrow to signify that we didn't poll
- throw e;
- } catch (RuntimeCamelException e) {
- LOG.warn("Caught RuntimeCamelException: " + e.getMessage(), e);
- LOG.warn("Hoping an explicit disconnect/reconnect will solve the
problem");
- disconnect();
- // Rethrow to signify that we didn't poll
- throw e;
+ } catch (Exception e) {
+ if (isStopping() || isStopped()) {
+ // if we are stopping then ignore any exception during a poll
+ LOG.warn( "Consumer is stopping. Ignoring caught exception: " +
+ e.getClass().getCanonicalName() + " message: " +
e.getMessage());
+ } else {
+ LOG.warn("Exception occured during polling: " +
+ e.getClass().getCanonicalName() + " message: " +
e.getMessage());
+ disconnect();
+ // Rethrow to signify that we didn't poll
+ throw e;
+ }
}
}
protected void pollDirectory(String dir) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Polling directory: " + dir);
+ }
String currentDir = client.printWorkingDirectory();
client.changeWorkingDirectory(dir);
@@ -136,7 +154,6 @@
long ts = ftpFile.getTimestamp().getTimeInMillis();
// TODO do we need to adjust the TZ? can we?
if (ts > lastPollTime && isMatched(ftpFile)) {
- String remoteServer =
endpoint.getConfiguration().remoteServerInformation();
String fullFileName = getFullFileName(ftpFile);
// is we use excluse read then acquire the exclusive read (waiting
until we got it)
@@ -148,7 +165,7 @@
final ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
client.retrieveFile(ftpFile.getName(), byteArrayOutputStream);
if (LOG.isDebugEnabled()) {
- LOG.debug("Retrieved file: " + ftpFile.getName() + " from: " +
remoteServer);
+ LOG.debug("Retrieved file: " + ftpFile.getName() + " from: " +
remoteServer());
}
RemoteFileExchange exchange =
endpoint.createExchange(fullFileName, byteArrayOutputStream);
@@ -168,12 +185,12 @@
if (deleteFile) {
// delete file after consuming
if (LOG.isDebugEnabled()) {
- LOG.debug("Deleteing file: " + ftpFile.getName() + " from:
" + remoteServer);
+ LOG.debug("Deleteing file: " + ftpFile.getName() + " from:
" + remoteServer());
}
boolean deleted = client.deleteFile(ftpFile.getName());
if (!deleted) {
// ignore just log a warning
- LOG.warn("Could not delete file: " + ftpFile.getName() + "
from: " + remoteServer);
+ LOG.warn("Could not delete file: " + ftpFile.getName() + "
from: " + remoteServer());
}
}
@@ -187,7 +204,7 @@
// the trick is to try to rename the file, if we can rename then we
have exclusive read
// since its a remote file we can not use java.nio to get a RW access
String originalName = ftpFile.getName();
- String newName = originalName + ".camel";
+ String newName = originalName + ".camelExclusiveRead";
boolean exclusive = false;
while (! exclusive) {
exclusive = client.rename(originalName, newName);
@@ -208,6 +225,10 @@
}
}
+ private String remoteServer() {
+ return endpoint.getConfiguration().remoteServerInformation();
+ }
+
protected boolean isMatched(FTPFile file) {
boolean result = true;
if (regexPattern != null && regexPattern.length() > 0) {
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
Sun Jul 27 01:48:09 2008
@@ -16,12 +16,9 @@
*/
package org.apache.camel.component.file.remote;
-import java.io.IOException;
-
import org.apache.camel.Processor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.net.ftp.FTPClient;
public class FtpEndpoint extends RemoteFileEndpoint<RemoteFileExchange> {
private static final transient Log LOG =
LogFactory.getLog(FtpEndpoint.class);
@@ -35,37 +32,13 @@
}
public FtpProducer createProducer() throws Exception {
- return new FtpProducer(this, createFtpClient());
+ return new FtpProducer(this, FtpUtils.createNewFtpClient());
}
public FtpConsumer createConsumer(Processor processor) throws Exception {
- final FtpConsumer consumer = new FtpConsumer(this, processor,
createFtpClient());
+ final FtpConsumer consumer = new FtpConsumer(this, processor,
FtpUtils.createNewFtpClient());
configureConsumer(consumer);
return consumer;
}
- protected FTPClient createFtpClient() {
- return new FTPClient();
- }
-
- public void connect(FTPClient client) throws IOException {
- // TODO: connect and disconnect. createFtpClient should be moved to
another class they don't
- // belong on this endpoint class that is only for Camel related stuff
- RemoteFileConfiguration config = getConfiguration();
- String host = config.getHost();
- int port = config.getPort();
- String username = config.getUsername();
-
- client.connect(host, port);
- if (username != null) {
- client.login(username, config.getPassword());
- } else {
- client.login("anonymous", null);
- }
- client.setFileType(config.isBinary() ? FTPClient.BINARY_FILE_TYPE :
FTPClient.ASCII_FILE_TYPE);
- }
-
- public void disconnect(FTPClient client) throws IOException {
- client.disconnect();
- }
}
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperationFailedException.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperationFailedException.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperationFailedException.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperationFailedException.java
Sun Jul 27 01:48:09 2008
@@ -20,13 +20,14 @@
/**
* @version $Revision$
+ * @deprecated not used will be removed in Camel 2.0.
*/
public class FtpOperationFailedException extends RuntimeCamelException {
private final int code;
private final String reason;
public FtpOperationFailedException(int code, String reason) {
- super("Ftp Operation failed: " + reason.trim() + " Code: " + code);
+ super("Ftp operation failed: " + reason + " Code: " + code);
this.code = code;
this.reason = reason;
}
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpProducer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpProducer.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpProducer.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpProducer.java
Sun Jul 27 01:48:09 2008
@@ -39,44 +39,49 @@
}
public void process(Exchange exchange) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing " + endpoint.getConfiguration());
+ }
connectIfNecessary();
// If the attempt to connect isn't successful, then the thrown
// exception will signify that we couldn't deliver
try {
process(endpoint.createExchange(exchange));
- } catch (FTPConnectionClosedException e) {
- // If the server disconnected us, then we must manually disconnect
- // the client before attempting to reconnect
- LOG.warn("Disconnecting due to exception: " + e.getMessage());
- disconnect();
- // Rethrow to signify that we didn't deliver
- throw e;
- } catch (RuntimeCamelException e) {
- LOG.warn("Caught RuntimeCamelException: " + e.getMessage(), e);
- LOG.warn("Hoping an explicit disconnect/reconnect will solve the
problem");
- disconnect();
- // Rethrow to signify that we didn't deliver
- throw e;
+ } catch (Exception e) {
+ if (isStopping() || isStopped()) {
+ // if we are stopping then ignore any exception during a poll
+ LOG.warn( "Producer is stopping. Ignoring caught exception: " +
+ e.getClass().getCanonicalName() + " message: " +
e.getMessage());
+ } else {
+ LOG.warn("Exception occured during processing: " +
+ e.getClass().getCanonicalName() + " message: " +
e.getMessage());
+ disconnect();
+ // Rethrow to signify that we didn't poll
+ throw e;
+ }
}
}
protected void connectIfNecessary() throws IOException {
if (!client.isConnected()) {
- LOG.debug("Not connected, trying to reconnect.");
- endpoint.connect(client);
- LOG.info("Connected to " +
endpoint.getConfiguration().remoteServerInformation());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not connected, connecting to " + remoteServer());
+ }
+ FtpUtils.connect(client, endpoint.getConfiguration());
+ LOG.info("Connected to " + remoteServer());
}
}
public void disconnect() throws IOException {
- LOG.debug("Disconnecting from " +
endpoint.getConfiguration().remoteServerInformation());
- endpoint.disconnect(client);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Disconnecting from " + remoteServer());
+ }
+ FtpUtils.disconnect(client);
}
public void process(RemoteFileExchange exchange) throws Exception {
InputStream payload = exchange.getIn().getBody(InputStream.class);
try {
- String remoteServer =
endpoint.getConfiguration().remoteServerInformation();
String fileName = createFileName(exchange.getIn(),
endpoint.getConfiguration());
int lastPathIndex = fileName.lastIndexOf('/');
@@ -89,12 +94,10 @@
boolean success = client.storeFile(fileName, payload);
if (!success) {
- throw new RuntimeCamelException("Error sending file: " +
fileName + " to: " + remoteServer);
+ throw new RuntimeCamelException("Error sending file: " +
fileName + " to: " + remoteServer());
}
- if (LOG.isInfoEnabled()) {
- LOG.info("Sent: " + fileName + " to: " + remoteServer);
- }
+ LOG.info("Sent: " + fileName + " to: " + remoteServer());
} finally {
if (payload != null) {
payload.close();
@@ -105,11 +108,8 @@
@Override
protected void doStart() throws Exception {
LOG.info("Starting");
- try {
- connectIfNecessary();
- } catch (IOException e) {
- LOG.warn("Couldn't connect to: " +
endpoint.getConfiguration().remoteServerInformation());
- }
+ // do not connect when componet starts, just wait until we process as
we will
+ // connect at that time if needed
super.doStart();
}
@@ -141,4 +141,8 @@
return success;
}
+ private String remoteServer() {
+ return endpoint.getConfiguration().remoteServerInformation();
+ }
+
}
Added:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpUtils.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpUtils.java?rev=680078&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpUtils.java
(added)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpUtils.java
Sun Jul 27 01:48:09 2008
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.IOException;
+
+import org.apache.commons.net.ftp.FTPClient;
+
+/**
+ * Utility methods for FTP.
+ */
+public class FtpUtils {
+
+ private FtpUtils() {
+ }
+
+ public static void connect(FTPClient client, RemoteFileConfiguration
config) throws IOException {
+ String host = config.getHost();
+ int port = config.getPort();
+ String username = config.getUsername();
+
+ client.connect(host, port);
+ if (username != null) {
+ client.login(username, config.getPassword());
+ } else {
+ client.login("anonymous", null);
+ }
+ client.setFileType(config.isBinary() ? FTPClient.BINARY_FILE_TYPE :
FTPClient.ASCII_FILE_TYPE);
+ }
+
+ public static void disconnect(FTPClient client) throws IOException {
+ if (client.isConnected()) {
+ client.disconnect();
+ }
+ }
+
+ public static FTPClient createNewFtpClient() {
+ return new FTPClient();
+ }
+
+}
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileBinding.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileBinding.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileBinding.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileBinding.java
Sun Jul 27 01:48:09 2008
@@ -19,6 +19,7 @@
import java.io.OutputStream;
public class RemoteFileBinding {
+
public Object extractBodyFromOutputStream(RemoteFileExchange exchange,
OutputStream outputStream) {
return outputStream;
}
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConverter.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConverter.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConverter.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConverter.java
Sun Jul 27 01:48:09 2008
@@ -29,6 +29,7 @@
*/
@Converter
public final class RemoteFileConverter {
+
private RemoteFileConverter() {
// Helper Class
}
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
Sun Jul 27 01:48:09 2008
@@ -55,6 +55,18 @@
this.session = session;
}
+ protected void doStart() throws Exception {
+ LOG.info("Starting");
+ super.doStart();
+ }
+
+ protected void doStop() throws Exception {
+ LOG.info("Stopping");
+ // disconnect when stopping
+ disconnect();
+ super.doStop();
+ }
+
protected void connectIfNecessary() throws JSchException {
if (channel == null || !channel.isConnected()) {
if (session == null || !session.isConnected()) {
@@ -65,17 +77,18 @@
LOG.debug("Channel isn't connected, trying to recreate and
connect.");
channel = endpoint.createChannelSftp(session);
channel.connect();
- LOG.info("Connected to " +
endpoint.getConfiguration().remoteServerInformation());
+ LOG.info("Connected to " + remoteServer());
}
}
protected void disconnect() throws JSchException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Disconnecting from " + remoteServer());
+ }
if (session != null) {
- LOG.debug("Session is being explicitly disconnected");
session.disconnect();
}
if (channel != null) {
- LOG.debug("Channel is being explicitly disconnected");
channel.disconnect();
}
}
@@ -92,29 +105,31 @@
if (endpoint.getConfiguration().isDirectory()) {
pollDirectory(fileName);
} else {
+ // TODO: This code could be neater
channel.cd(fileName.substring(0, fileName.lastIndexOf('/')));
final ChannelSftp.LsEntry file =
(ChannelSftp.LsEntry)channel.ls(fileName.substring(fileName.lastIndexOf('/') +
1)).get(0);
pollFile(file);
}
lastPollTime = System.currentTimeMillis();
- } catch (JSchException e) {
- // If the connection has gone stale, then we must manually
disconnect
- // the client before attempting to reconnect
- LOG.warn("Disconnecting due to exception: " + e.getMessage());
- disconnect();
- // Rethrow to signify that we didn't poll
- throw e;
- } catch (SftpException e) {
- // Still not sure if/when these come up and what we should do
about them
- // client.disconnect();
- LOG.warn("Caught SftpException:" + e.getMessage(), e);
- LOG.warn("Hoping an explicit disconnect/reconnect will solve the
problem");
- // Rethrow to signify that we didn't poll
- throw e;
+ } catch (Exception e) {
+ if (isStopping() || isStopped()) {
+ // if we are stopping then ignore any exception during a poll
+ LOG.warn( "Consumer is stopping. Ignoring caught exception: " +
+ e.getClass().getCanonicalName() + " message: " +
e.getMessage());
+ } else {
+ LOG.warn("Exception occured during polling: " +
+ e.getClass().getCanonicalName() + " message: " +
e.getMessage());
+ disconnect();
+ // Rethrow to signify that we didn't poll
+ throw e;
+ }
}
}
protected void pollDirectory(String dir) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Polling directory: " + dir);
+ }
String currentDir = channel.pwd();
channel.cd(dir);
@@ -147,7 +162,7 @@
// TODO do we need to adjust the TZ? can we?
if (ts > lastPollTime && isMatched(sftpFile)) {
- String remoteServer =
endpoint.getConfiguration().remoteServerInformation();
+ String fullFileName = getFullFileName(sftpFile);
// is we use excluse read then acquire the exclusive read (waiting
until we got it)
if (exclusiveRead) {
@@ -158,15 +173,18 @@
final ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
channel.get(sftpFile.getFilename(), byteArrayOutputStream);
if (LOG.isDebugEnabled()) {
- LOG.debug("Retrieved file: " + sftpFile.getFilename() + "
from: " + remoteServer);
+ LOG.debug("Retrieved file: " + sftpFile.getFilename() + "
from: " + remoteServer());
}
RemoteFileExchange exchange =
endpoint.createExchange(getFullFileName(sftpFile), byteArrayOutputStream);
if (isSetNames()) {
- String relativePath =
getFullFileName(sftpFile).substring(endpoint.getConfiguration().getFile().length());
- if (relativePath.startsWith("/")) {
- relativePath = relativePath.substring(1);
+ String ftpBasePath = endpoint.getConfiguration().getFile();
+ String relativePath =
fullFileName.substring(ftpBasePath.length() + 1);
+ relativePath = relativePath.replaceFirst("/", "");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting exchange filename to " + relativePath);
}
exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME,
relativePath);
}
@@ -174,13 +192,13 @@
if (deleteFile) {
// delete file after consuming
if (LOG.isDebugEnabled()) {
- LOG.debug("Deleteing file: " + sftpFile.getFilename() + "
from: " + remoteServer);
+ LOG.debug("Deleteing file: " + sftpFile.getFilename() + "
from: " + remoteServer());
}
try {
channel.rm(sftpFile.getFilename());
} catch (SftpException e) {
// ignore just log a warning
- LOG.warn("Could not delete file: " +
sftpFile.getFilename() + " from: " + remoteServer);
+ LOG.warn("Could not delete file: " +
sftpFile.getFilename() + " from: " + remoteServer());
}
}
@@ -194,7 +212,7 @@
// the trick is to try to rename the file, if we can rename then we
have exclusive read
// since its a remote file we can not use java.nio to get a RW access
String originalName = sftpFile.getFilename();
- String newName = originalName + ".camel";
+ String newName = originalName + ".camelExclusiveRead";
boolean exclusive = false;
while (! exclusive) {
try {
@@ -221,6 +239,10 @@
}
}
+ private String remoteServer() {
+ return endpoint.getConfiguration().remoteServerInformation();
+ }
+
protected boolean isMatched(ChannelSftp.LsEntry sftpFile) {
boolean result = true;
if (regexPattern != null && regexPattern.length() > 0) {
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpProducer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpProducer.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpProducer.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpProducer.java
Sun Jul 27 01:48:09 2008
@@ -41,6 +41,30 @@
this.session = session;
}
+ public void process(Exchange exchange) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing " + endpoint.getConfiguration());
+ }
+ connectIfNecessary();
+ // If the attempt to connect isn't successful, then the thrown
+ // exception will signify that we couldn't deliver
+ try {
+ process(endpoint.createExchange(exchange));
+ } catch (Exception e) {
+ if (isStopping() || isStopped()) {
+ // if we are stopping then ignore any exception during a poll
+ LOG.warn( "Producer is stopping. Ignoring caught exception: " +
+ e.getClass().getCanonicalName() + " message: " +
e.getMessage());
+ } else {
+ LOG.warn("Exception occured during processing: " +
+ e.getClass().getCanonicalName() + " message: " +
e.getMessage());
+ disconnect();
+ // Rethrow to signify that we didn't poll
+ throw e;
+ }
+ }
+ }
+
protected void connectIfNecessary() throws JSchException {
if (channel == null || !channel.isConnected()) {
if (session == null || !session.isConnected()) {
@@ -56,40 +80,17 @@
}
protected void disconnect() throws JSchException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Disconnecting from " + remoteServer());
+ }
if (session != null) {
- LOG.debug("Session is being explicitly disconnected");
session.disconnect();
}
if (channel != null) {
- LOG.debug("Channel is being explicitly disconnected");
channel.disconnect();
}
}
- public void process(Exchange exchange) throws Exception {
- connectIfNecessary();
- // If the attempt to connect isn't successful, then the thrown
- // exception will signify that we couldn't deliver
- try {
- process(endpoint.createExchange(exchange));
- } catch (JSchException e) {
- // If the connection has gone stale, then we must manually
disconnect
- // the client before attempting to reconnect
- LOG.warn("Disconnecting due to exception: " + e.getMessage());
- disconnect();
- // Rethrow to signify that we didn't deliver
- throw e;
- } catch (SftpException e) {
- // Still not sure if/when these come up and what we should do
about them
- // client.disconnect();
- LOG.warn("Caught SftpException:" + e.getMessage(), e);
- LOG.warn("Hoping an explicit disconnect/reconnect will solve the
problem");
- disconnect();
- // Rethrow to signify that we didn't deliver
- throw e;
- }
- }
-
public void process(RemoteFileExchange exchange) throws Exception {
InputStream payload = exchange.getIn().getBody(InputStream.class);
try {
@@ -107,9 +108,7 @@
channel.put(payload, fileName);
- if (LOG.isInfoEnabled()) {
- LOG.info("Sent: " + fileName + " to: " + remoteServer);
- }
+ LOG.info("Sent: " + fileName + " to: " + remoteServer);
} finally {
if (payload != null) {
payload.close();
@@ -120,11 +119,8 @@
@Override
protected void doStart() throws Exception {
LOG.info("Starting");
- try {
- connectIfNecessary();
- } catch (JSchException e) {
- LOG.warn("Couldn't connect to: " +
endpoint.getConfiguration().remoteServerInformation());
- }
+ // do not connect when componet starts, just wait until we process as
we will
+ // connect at that time if needed
super.doStart();
}
@@ -164,4 +160,8 @@
return success;
}
+ private String remoteServer() {
+ return endpoint.getConfiguration().remoteServerInformation();
+ }
+
}
\ No newline at end of file
Modified:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
Sun Jul 27 01:48:09 2008
@@ -31,7 +31,7 @@
private static final Log LOG =
LogFactory.getLog(FromFtpExclusiveReadTest.class);
- private String port = "20019";
+ private String port = "20025";
private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port +
"/slowfile?password=admin&binary=false&consumer.exclusiveRead=true&consumer.delay=500";
public String getPort() {
Copied:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java
(from r680012,
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java?p2=activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java&p1=activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java&r1=680012&r2=680078&rev=680078&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java
Sun Jul 27 01:48:09 2008
@@ -25,14 +25,14 @@
import org.apache.commons.logging.LogFactory;
/**
- * Unit test to verify exclusive read - that we do not poll files that is in
progress of being written.
+ * Unit test to verify *NON* exclusive read.
*/
-public class FromFtpExclusiveReadTest extends FtpServerTestSupport {
+public class FromFtpNonExclusiveReadTest extends FtpServerTestSupport {
private static final Log LOG =
LogFactory.getLog(FromFtpExclusiveReadTest.class);
- private String port = "20019";
- private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port +
"/slowfile?password=admin&binary=false&consumer.exclusiveRead=true&consumer.delay=500";
+ private String port = "20027";
+ private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port +
"/slowfile?password=admin&binary=false&consumer.exclusiveRead=false&consumer.delay=500";
public String getPort() {
return port;
@@ -43,11 +43,15 @@
createDirectory("./res/home/slowfile");
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);
- mock.expectedBodiesReceived("Hello WorldLine #0Line #1Line #2Bye
World");
createSlowFile();
mock.assertIsSatisfied();
+
+ // we read only part of the file as we dont have exclusive read and
thus read part of the
+ // file currently in progress of being written - so we get only the
Hello World part
+ String body = mock.getExchanges().get(0).getIn().getBody(String.class);
+ assertFalse("Should not get the entire file", body.endsWith("Bye
World"));
}
private void createSlowFile() throws Exception {
@@ -77,4 +81,4 @@
File file = new File(s);
file.mkdirs();
}
-}
+}
\ No newline at end of file
Copied:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpPollFileOnlyTest.java
(from r679955,
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToAsciiFileTest.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpPollFileOnlyTest.java?p2=activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpPollFileOnlyTest.java&p1=activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToAsciiFileTest.java&r1=679955&r2=680078&rev=680078&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpToAsciiFileTest.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpPollFileOnlyTest.java
Sun Jul 27 01:48:09 2008
@@ -16,8 +16,6 @@
*/
package org.apache.camel.component.file.remote;
-import java.io.File;
-
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
@@ -26,29 +24,19 @@
import org.apache.camel.component.mock.MockEndpoint;
/**
- * Unit test to verify that we can pool an ASCII file from the FTP Server and
store it on a local file path
+ * Unit test to poll a file from the FTP server and not a folder as most test
is.
*/
-public class FromFtpToAsciiFileTest extends FtpServerTestSupport {
+public class FromFtpPollFileOnlyTest extends FtpServerTestSupport {
- private String port = "20013";
- private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port +
"/tmp3/camel?password=admin&binary=false";
+ private String port = "20028";
+ private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port +
"/fileonly/report.txt?password=admin&directory=false";
- public void testFtpRoute() throws Exception {
- MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
- resultEndpoint.expectedMinimumMessageCount(1);
- resultEndpoint.expectedBodiesReceived("Hello World from FTPServer");
- resultEndpoint.assertIsSatisfied();
-
- // wait until the file producer has written the file
- Thread.sleep(1000);
-
- // assert the file
- File file = new File("target/ftptest/deleteme.txt");
- assertTrue("The ASCII file should exists", file.exists());
- assertTrue("File size wrong", file.length() > 10);
+ public void testPollFileOnly() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMinimumMessageCount(1);
+ mock.expectedBodiesReceived("Hello World from FTPServer");
- // let some time pass to let the consumer etc. properly do its
business before closing
- Thread.sleep(1000);
+ mock.assertIsSatisfied();
}
public String getPort() {
@@ -64,10 +52,10 @@
private void prepareFtpServer() throws Exception {
// prepares the FTP Server by creating a file on the server that we
want to unit
// test that we can pool and store as a local file
- Endpoint endpoint = context.getEndpoint(ftpUrl);
+ Endpoint endpoint = context.getEndpoint("ftp://[EMAIL PROTECTED]:" +
port + "/fileonly/?password=admin&binary=false");
Exchange exchange = endpoint.createExchange();
exchange.getIn().setBody("Hello World from FTPServer");
- exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME,
"hello.txt");
+ exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME,
"report.txt");
Producer producer = endpoint.createProducer();
producer.start();
producer.process(exchange);
@@ -77,12 +65,9 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- String fileUrl = "file:target/ftptest/?append=false&noop=true";
- from(ftpUrl).setHeader(FileComponent.HEADER_FILE_NAME,
constant("deleteme.txt")).
-
convertBodyTo(String.class).to(fileUrl).to("mock:result");
+ from(ftpUrl).to("mock:result");
}
};
}
-}
-
+}
\ No newline at end of file
Propchange:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpPollFileOnlyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpPollFileOnlyTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerBuildDirectoryTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerBuildDirectoryTest.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerBuildDirectoryTest.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerBuildDirectoryTest.java
Sun Jul 27 01:48:09 2008
@@ -27,7 +27,7 @@
*/
public class FtpProducerBuildDirectoryTest extends FtpServerTestSupport {
- private String port = "20018";
+ private String port = "20026";
private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port +
"/upload/user/claus?binary=false&password=admin";
public String getPort() {
Modified:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpServerTestSupport.java
Sun Jul 27 01:48:09 2008
@@ -42,6 +42,7 @@
protected void tearDown() throws Exception {
super.tearDown();
// must stop server after super to let the clients stop correctly
(CAMEL-444)
+ ftpServer.getServerContext().dispose();
ftpServer.stop();
}
Modified:
activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties?rev=680078&r1=680077&r2=680078&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties
Sun Jul 27 01:48:09 2008
@@ -21,7 +21,7 @@
log4j.rootLogger=INFO, file
# uncomment the following to enable camel debugging
-log4j.logger.org.apache.camel.component.file=DEBUG
+log4j.logger.org.apache.camel.component.file=TRACE
log4j.logger.org.apache.mina=WARN
log4j.logger.org.apache.ftpserver=WARN