This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch camel-2.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.x by this push: new 4f46955 Backport of CAMEL-13679: fixed Camel-FTP component does not set CamelFtpReplyCode in some case(ex 530). new a03a7be Merge pull request #3364 from valdar/CAMEL-13679 4f46955 is described below commit 4f469551007760b4a22ef4744f8e65abb4eb9b07 Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Wed Nov 27 12:39:50 2019 +0100 Backport of CAMEL-13679: fixed Camel-FTP component does not set CamelFtpReplyCode in some case(ex 530). --- .../camel/component/file/GenericFileProducer.java | 4 +- .../camel/component/file/remote/FtpOperations.java | 18 ++++++-- .../component/file/remote/FtpsOperations.java | 11 ++++- .../component/file/remote/RemoteFileConsumer.java | 2 +- .../file/remote/RemoteFileOperations.java | 4 +- .../component/file/remote/RemoteFileProducer.java | 8 ++-- .../component/file/remote/SftpOperations.java | 14 +++---- ...ava => FtpProducerConnectErrorsHeaderTest.java} | 48 ++++++++-------------- .../file/remote/sftp/SftpECKeyFileConsumeTest.java | 2 +- .../apache/camel/component/scp/ScpOperations.java | 2 +- 10 files changed, 59 insertions(+), 54 deletions(-) diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java index df52f68..bb11bce 100644 --- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java +++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java @@ -111,7 +111,7 @@ public class GenericFileProducer<T> extends DefaultProducer { log.trace("Processing file: {} for exchange: {}", target, exchange); try { - preWriteCheck(); + preWriteCheck(exchange); // should we write to a temporary name and then afterwards rename to real target boolean writeAsTempAndRename = ObjectHelper.isNotEmpty(endpoint.getTempFileName()); @@ -269,7 +269,7 @@ public class GenericFileProducer<T> extends DefaultProducer { /** * Perform any actions that need to occur before we write such as connecting to an FTP server etc. */ - public void preWriteCheck() throws Exception { + public void preWriteCheck(Exchange exchange) throws Exception { // nothing needed to check } diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java index 10f507c..e48d07c 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java @@ -78,18 +78,18 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> { this.clientActivityListener = clientActivityListener; } - public boolean connect(RemoteFileConfiguration configuration) throws GenericFileOperationFailedException { + public boolean connect(RemoteFileConfiguration configuration, Exchange exchange) throws GenericFileOperationFailedException { client.setCopyStreamListener(clientActivityListener); try { - return doConnect(configuration); + return doConnect(configuration, exchange); } catch (GenericFileOperationFailedException e) { clientActivityListener.onGeneralError(endpoint.getConfiguration().remoteServerInformation(), e.getMessage()); throw e; } } - protected boolean doConnect(RemoteFileConfiguration configuration) throws GenericFileOperationFailedException { + protected boolean doConnect(RemoteFileConfiguration configuration, Exchange exchange) throws GenericFileOperationFailedException { log.trace("Connecting using FTPClient: {}", client); String host = configuration.getHost(); @@ -162,6 +162,12 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> { throw new GenericFileOperationFailedException("Interrupted during sleeping", ie); } } + } finally { + if (exchange != null) { + // store client reply information after the operation + exchange.getIn().setHeader(FtpConstants.FTP_REPLY_CODE, client.getReplyCode()); + exchange.getIn().setHeader(FtpConstants.FTP_REPLY_STRING, client.getReplyString()); + } } } @@ -222,6 +228,12 @@ public class FtpOperations implements RemoteFileOperations<FTPFile> { client.setFileType(configuration.isBinary() ? FTP.BINARY_FILE_TYPE : FTP.ASCII_FILE_TYPE); } catch (IOException e) { throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e); + } finally { + if (exchange != null) { + // store client reply information after the operation + exchange.getIn().setHeader(FtpConstants.FTP_REPLY_CODE, client.getReplyCode()); + exchange.getIn().setHeader(FtpConstants.FTP_REPLY_STRING, client.getReplyString()); + } } // site commands diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpsOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpsOperations.java index 1f16524..a9425e6 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpsOperations.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpsOperations.java @@ -20,6 +20,7 @@ import java.io.IOException; import javax.net.ssl.SSLException; +import org.apache.camel.Exchange; import org.apache.camel.component.file.GenericFileOperationFailedException; import org.apache.camel.util.ObjectHelper; import org.apache.commons.net.ftp.FTPClientConfig; @@ -37,8 +38,8 @@ public class FtpsOperations extends FtpOperations { } @Override - public boolean connect(RemoteFileConfiguration configuration) throws GenericFileOperationFailedException { - boolean answer = super.connect(configuration); + public boolean connect(RemoteFileConfiguration configuration, Exchange exchange) throws GenericFileOperationFailedException { + boolean answer = super.connect(configuration, exchange); FtpsConfiguration config = (FtpsConfiguration) configuration; if (answer) { @@ -69,6 +70,12 @@ public class FtpsOperations extends FtpOperations { } catch (IOException e) { throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e); + } finally { + if (exchange != null) { + // store client reply information after the operation + exchange.getIn().setHeader(FtpConstants.FTP_REPLY_CODE, client.getReplyCode()); + exchange.getIn().setHeader(FtpConstants.FTP_REPLY_STRING, client.getReplyString()); + } } } diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java index 8c18751..46bc7b9 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java @@ -194,7 +194,7 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> { if (log.isDebugEnabled()) { log.debug("Not connected/logged in, connecting to: {}", remoteServer()); } - loggedIn = getOperations().connect((RemoteFileConfiguration) endpoint.getConfiguration()); + loggedIn = getOperations().connect((RemoteFileConfiguration) endpoint.getConfiguration(), null); if (loggedIn) { log.debug("Connected and logged in to: {}", remoteServer()); } diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileOperations.java index 7136682..a84ce99 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileOperations.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileOperations.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.file.remote; +import org.apache.camel.Exchange; import org.apache.camel.component.file.GenericFileOperationFailedException; import org.apache.camel.component.file.GenericFileOperations; @@ -28,10 +29,11 @@ public interface RemoteFileOperations<T> extends GenericFileOperations<T> { * Connects to the remote server * * @param configuration configuration + * @param exchange the exchange that trigger the connect (if any) * @return <tt>true</tt> if connected * @throws GenericFileOperationFailedException can be thrown */ - boolean connect(RemoteFileConfiguration configuration) throws GenericFileOperationFailedException; + boolean connect(RemoteFileConfiguration configuration, Exchange exchange) throws GenericFileOperationFailedException; /** * Returns whether we are connected to the remote server or not diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java index 38ccdd1..23fd0e1 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java @@ -103,7 +103,7 @@ public class RemoteFileProducer<T> extends GenericFileProducer<T> implements Ser } @Override - public void preWriteCheck() throws Exception { + public void preWriteCheck(Exchange exchange) throws Exception { // before writing send a noop to see if the connection is alive and works boolean noop = false; if (loggedIn) { @@ -127,7 +127,7 @@ public class RemoteFileProducer<T> extends GenericFileProducer<T> implements Ser // if not alive then reconnect if (!noop) { try { - connectIfNecessary(); + connectIfNecessary(exchange); } catch (Exception e) { loggedIn = false; @@ -173,11 +173,11 @@ public class RemoteFileProducer<T> extends GenericFileProducer<T> implements Ser super.doStop(); } - protected void connectIfNecessary() throws GenericFileOperationFailedException { + protected void connectIfNecessary(Exchange exchange) throws GenericFileOperationFailedException { if (!loggedIn || !getOperations().isConnected()) { log.debug("Not already connected/logged in. Connecting to: {}", getEndpoint()); RemoteFileConfiguration config = getEndpoint().getConfiguration(); - loggedIn = getOperations().connect(config); + loggedIn = getOperations().connect(config, exchange); if (!loggedIn) { return; } diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java index 3974aff..9f16dcf 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java @@ -98,7 +98,7 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { this.endpoint = (SftpEndpoint)endpoint; } - public synchronized boolean connect(RemoteFileConfiguration configuration) throws GenericFileOperationFailedException { + public synchronized boolean connect(RemoteFileConfiguration configuration, Exchange exchange) throws GenericFileOperationFailedException { if (isConnected()) { // already connected return true; @@ -465,16 +465,16 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { } } - private void reconnectIfNecessary() { + private void reconnectIfNecessary(Exchange exchange) { if (!isConnected()) { - connect(endpoint.getConfiguration()); + connect(endpoint.getConfiguration(), exchange); } } public synchronized boolean deleteFile(String name) throws GenericFileOperationFailedException { LOG.debug("Deleting file: {}", name); try { - reconnectIfNecessary(); + reconnectIfNecessary(null); channel.rm(name); return true; } catch (SftpException e) { @@ -486,7 +486,7 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { public synchronized boolean renameFile(String from, String to) throws GenericFileOperationFailedException { LOG.debug("Renaming file: {} to: {}", from, to); try { - reconnectIfNecessary(); + reconnectIfNecessary(null); // make use of the '/' separator because JSch expects this // as the file separator even on Windows to = FileUtil.compactPath(to, '/'); @@ -620,10 +620,10 @@ public class SftpOperations implements RemoteFileOperations<SftpRemoteFile> { doChangeDirectory(path.substring(0, 1)); path = path.substring(1); } else { - if(path.matches("^[a-zA-Z]:(//).*$")) { + if (path.matches("^[a-zA-Z]:(//).*$")) { doChangeDirectory(path.substring(0, 3)); path = path.substring(3); - } else if(path.matches("^[a-zA-Z]:(\\\\).*$")) { + } else if (path.matches("^[a-zA-Z]:(\\\\).*$")) { doChangeDirectory(path.substring(0, 4)); path = path.substring(4); } diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpECKeyFileConsumeTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerConnectErrorsHeaderTest.java similarity index 50% copy from components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpECKeyFileConsumeTest.java copy to components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerConnectErrorsHeaderTest.java index cb4ab0c..9c3a274 100644 --- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpECKeyFileConsumeTest.java +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpProducerConnectErrorsHeaderTest.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,53 +14,37 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.file.remote.sftp; +package org.apache.camel.component.file.remote; -import java.security.interfaces.ECPublicKey; - -import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.sshd.server.auth.pubkey.PublickeyAuthenticator; import org.junit.Test; -public class SftpECKeyFileConsumeTest extends SftpServerTestSupport { - - @Test - public void testSftpSimpleConsume() throws Exception { - if (!canTest()) { - return; - } +public class FtpProducerConnectErrorsHeaderTest extends FtpServerTestSupport { - String expected = "Hello World"; - - // create file using regular file - template.sendBodyAndHeader("file://" + FTP_ROOT_DIR, expected, Exchange.FILE_NAME, "hello.txt"); + private String getFtpUrl() { + return "ftp://xxx@localhost:" + getPort() + "/tmp4/camel?password=xxx&consumer.delay=5000"; + } + @Test + public void testConsumerConnectErrorsHeader() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); - mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt"); - mock.expectedBodiesReceived(expected); - - context.getRouteController().startRoute("foo"); + mock.expectedHeaderReceived("CamelFtpReplyCode", "530"); + template.sendBody("direct:start", "hi"); assertMockEndpointsSatisfied(); } - @Override - protected PublickeyAuthenticator getPublickeyAuthenticator() { - return (username, key, session) -> key instanceof ECPublicKey; - } - - @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { - @Override public void configure() throws Exception { - from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR - + "?username=admin&knownHostsFile=./src/test/resources/known_hosts&privateKeyFile=./src/test/resources/ec.pem&delay=10s&disconnect=true") - .routeId("foo").noAutoStartup() - .to("mock:result"); + from("direct:start") + .doTry() + .to(getFtpUrl()) + .doCatch(Exception.class) + .to("mock:result") + .endDoTry(); } }; } diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpECKeyFileConsumeTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpECKeyFileConsumeTest.java index cb4ab0c..7e753d3 100644 --- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpECKeyFileConsumeTest.java +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpECKeyFileConsumeTest.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. diff --git a/components/camel-jsch/src/main/java/org/apache/camel/component/scp/ScpOperations.java b/components/camel-jsch/src/main/java/org/apache/camel/component/scp/ScpOperations.java index 9bc404d..86b2bdd 100644 --- a/components/camel-jsch/src/main/java/org/apache/camel/component/scp/ScpOperations.java +++ b/components/camel-jsch/src/main/java/org/apache/camel/component/scp/ScpOperations.java @@ -174,7 +174,7 @@ public class ScpOperations implements RemoteFileOperations<ScpFile> { } @Override - public boolean connect(RemoteFileConfiguration configuration) throws GenericFileOperationFailedException { + public boolean connect(RemoteFileConfiguration configuration, Exchange exchange) throws GenericFileOperationFailedException { if (!isConnected()) { session = createSession(configuration instanceof ScpConfiguration ? (ScpConfiguration)configuration : null); // TODO: deal with reconnection attempts