Repository: camel Updated Branches: refs/heads/camel-2.18.x 96aa5be3e -> 0f3a0c2d6
CAMEL-10712: Camel-SFTP endpoints will silently not delete file on disconnect Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0f3a0c2d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0f3a0c2d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0f3a0c2d Branch: refs/heads/camel-2.18.x Commit: 0f3a0c2d68fbd685c6dbd6923cca1aeb82ef81fa Parents: 96aa5be Author: Stephan Siano <[email protected]> Authored: Tue Jan 17 08:27:59 2017 +0100 Committer: Stephan Siano <[email protected]> Committed: Tue Jan 17 09:02:02 2017 +0100 ---------------------------------------------------------------------- .../component/file/remote/SftpOperations.java | 10 ++ .../remote/sftp/SftpConsumerDisconnectTest.java | 141 +++++++++++++++++++ .../file/remote/sftp/SftpServerTestSupport.java | 11 ++ 3 files changed, 162 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0f3a0c2d/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java index c502f5b..bb0ffba 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java @@ -419,12 +419,20 @@ public class SftpOperations implements RemoteFileOperations<ChannelSftp.LsEntry> } } + private void reconnectIfNecessary() { + if (!isConnected()) { + connect((RemoteFileConfiguration) endpoint.getConfiguration()); + } + } + public synchronized boolean deleteFile(String name) throws GenericFileOperationFailedException { LOG.debug("Deleting file: {}", name); try { + reconnectIfNecessary(); channel.rm(name); return true; } catch (SftpException e) { + LOG.warn("Cannot delete file: " + name, e); throw new GenericFileOperationFailedException("Cannot delete file: " + name, e); } } @@ -432,9 +440,11 @@ public class SftpOperations implements RemoteFileOperations<ChannelSftp.LsEntry> public synchronized boolean renameFile(String from, String to) throws GenericFileOperationFailedException { LOG.debug("Renaming file: {} to: {}", from, to); try { + reconnectIfNecessary(); channel.rename(from, to); return true; } catch (SftpException e) { + LOG.warn("Cannot rename file from: " + from + " to: " + to, e); throw new GenericFileOperationFailedException("Cannot rename file from: " + from + " to: " + to, e); } } http://git-wip-us.apache.org/repos/asf/camel/blob/0f3a0c2d/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpConsumerDisconnectTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpConsumerDisconnectTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpConsumerDisconnectTest.java new file mode 100644 index 0000000..e3e540c --- /dev/null +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpConsumerDisconnectTest.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote.sftp; + +import java.io.File; +import java.io.IOException; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.commons.io.FileUtils; +import org.junit.Before; +import org.junit.Test; + +public class SftpConsumerDisconnectTest extends SftpServerTestSupport { + private static final String SAMPLE_FILE_NAME_1 = String.format("sample-1-%s.txt", SftpConsumerDisconnectTest.class.getSimpleName()); + private static final String SAMPLE_FILE_NAME_2 = String.format("sample-2-%s.txt", SftpConsumerDisconnectTest.class.getSimpleName()); + private static final String SAMPLE_FILE_CHARSET = "iso-8859-1"; + private static final String SAMPLE_FILE_PAYLOAD = "abc"; + + @Before + public void setUp() throws Exception { + super.setUp(); + context.stopRoute("foo"); + context.stopRoute("bar"); + } + + @Test + public void testConsumeDelete() throws Exception { + if (!canTest()) { + return; + } + + // prepare sample file to be consumed by SFTP consumer + createSampleFile(SAMPLE_FILE_NAME_1); + + // Prepare expectations + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.expectedBodiesReceived(SAMPLE_FILE_PAYLOAD); + + context.startRoute("foo"); + + // Check that expectations are satisfied + assertMockEndpointsSatisfied(); + + // File is deleted + assertTrue(fileRemovedEventually(FTP_ROOT_DIR + "/" + SAMPLE_FILE_NAME_1)); + } + + public boolean fileRemovedEventually(String fileName) throws InterruptedException { + // try up to 10 seconds + for (int i = 0; i < 10; i++) { + // Give it a second to delete the file + Thread.sleep(1000); + + // File is deleted + File file = new File(FTP_ROOT_DIR + "/" + SAMPLE_FILE_NAME_1); + if (!file.exists()) { + return true; + } + } + + return false; + } + + @Test + public void testConsumeMove() throws Exception { + if (!canTest()) { + return; + } + + // prepare sample file to be consumed by SFTP consumer + createSampleFile(SAMPLE_FILE_NAME_2); + + // Prepare expectations + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.expectedBodiesReceived(SAMPLE_FILE_PAYLOAD); + + context.startRoute("bar"); + + // Check that expectations are satisfied + assertMockEndpointsSatisfied(); + + // give it a second to move the file + Thread.sleep(1000); + + // File is moved + assertTrue(fileRemovedEventually(FTP_ROOT_DIR + "/" + SAMPLE_FILE_NAME_2)); + File file = new File(FTP_ROOT_DIR + "/.camel/" + SAMPLE_FILE_NAME_2); + assertTrue(file.exists()); + file.delete(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&delete=true").routeId("foo").noAutoStartup().process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + disconnectAllSessions(); // disconnect all Sessions from + // the SFTP server + } + }).to("mock:result"); + from("sftp://localhost:" + getPort() + "/" + FTP_ROOT_DIR + "?username=admin&password=admin&noop=false&move=.camel").routeId("bar").noAutoStartup() + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + disconnectAllSessions(); // disconnect all Sessions + // from the SFTP server + } + }).to("mock:result"); + } + }; + } + + private void createSampleFile(String fileName) throws IOException { + File file = new File(FTP_ROOT_DIR + "/" + fileName); + + FileUtils.write(file, SAMPLE_FILE_PAYLOAD, SAMPLE_FILE_CHARSET); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/0f3a0c2d/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpServerTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpServerTestSupport.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpServerTestSupport.java index 1e323ac..7d20986 100644 --- a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpServerTestSupport.java +++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/sftp/SftpServerTestSupport.java @@ -17,9 +17,11 @@ package org.apache.camel.component.file.remote.sftp; import java.io.File; +import java.io.IOException; import java.security.NoSuchAlgorithmException; import java.security.PublicKey; import java.util.Arrays; +import java.util.List; import org.apache.camel.component.file.remote.BaseServerTestSupport; import org.apache.camel.util.ObjectHelper; @@ -27,6 +29,7 @@ import org.apache.commons.io.FileUtils; import org.apache.sshd.SshServer; import org.apache.sshd.common.NamedFactory; import org.apache.sshd.common.keyprovider.FileKeyPairProvider; +import org.apache.sshd.common.session.AbstractSession; import org.apache.sshd.server.Command; import org.apache.sshd.server.PublickeyAuthenticator; import org.apache.sshd.server.command.ScpCommandFactory; @@ -132,4 +135,12 @@ public class SftpServerTestSupport extends BaseServerTestSupport { protected boolean canTest() { return canTest; } + + // disconnect all existing SSH sessions to test reconnect functionality + protected void disconnectAllSessions() throws IOException { + List<AbstractSession> sessions = sshd.getActiveSessions(); + for (AbstractSession session : sessions) { + session.disconnect(4, "dummy"); + } + } }
