This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b5f0b43fcb [Improve][Connector-V2] Ensure that the FTP connector
behaves reliably during directory operation (#8959)
b5f0b43fcb is described below
commit b5f0b43fcb6d1700014645a088cefecd333cb959
Author: corgy-w <[email protected]>
AuthorDate: Thu Mar 13 15:20:34 2025 +0800
[Improve][Connector-V2] Ensure that the FTP connector behaves reliably
during directory operation (#8959)
---
.../connector-file/connector-file-ftp/pom.xml | 10 +
.../file/ftp/system/SeaTunnelFTPFileSystem.java | 130 +++++++---
.../ftp/system/SeaTunnelFTPFileSystemTest.java | 268 +++++++++++++++++++++
3 files changed, 372 insertions(+), 36 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
index 0ab14d7e04..a3b84b84c5 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
@@ -29,6 +29,10 @@
<artifactId>connector-file-ftp</artifactId>
<name>SeaTunnel : Connectors V2 : File : Ftp</name>
+ <properties>
+ <mockftpserver.version>3.1.0</mockftpserver.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
@@ -48,6 +52,12 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.mockftpserver</groupId>
+ <artifactId>MockFtpServer</artifactId>
+ <version>${mockftpserver.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
index 963d18c703..51d10d4e97 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
@@ -127,13 +127,19 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
* @throws IOException IOException
*/
private FTPClient connect() throws IOException {
- FTPClient client = null;
+ FTPClient client = new FTPClient();
Configuration conf = getConf();
+ // Get the connection mode from configuration, default to
passive_local mode
+ String connectionMode =
+ conf.get(FS_FTP_CONNECTION_MODE,
FtpConnectionMode.ACTIVE_LOCAL.getMode());
+
+ // Retrieve host, port, user, and password from configuration
String host = conf.get(FS_FTP_HOST);
int port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT);
String user = conf.get(FS_FTP_USER_PREFIX + host);
String password = conf.get(FS_FTP_PASSWORD_PREFIX + host);
- client = new FTPClient();
+
+ // Connect to the FTP server
client.connect(host, port);
int reply = client.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
@@ -143,23 +149,29 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
NetUtils.UNKNOWN_HOST,
0,
new ConnectException("Server response " + reply));
- } else if (client.login(user, password)) {
- client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
- client.setFileType(FTP.BINARY_FILE_TYPE);
- client.setBufferSize(DEFAULT_BUFFER_SIZE);
- } else {
+ }
+
+ // Log in to the FTP server
+ if (!client.login(user, password)) {
throw new IOException(
- "Login failed on server - "
- + host
- + ", port - "
- + port
- + " as user '"
- + user
- + "'");
+ String.format(
+ "Login failed on server - %s, port - %d as user
'%s', reply code: %d",
+ host, port, user, client.getReplyCode()));
}
- setFsFtpConnectionMode(
- client, conf.get(FS_FTP_CONNECTION_MODE,
FtpConnectionMode.ACTIVE_LOCAL.getMode()));
+ // Set the file type to binary and buffer size
+ client.setFileType(FTP.BINARY_FILE_TYPE);
+ client.setBufferSize(DEFAULT_BUFFER_SIZE);
+ client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
+
+ // Set the connection mode
+ setFsFtpConnectionMode(client, connectionMode);
+
+ // Log successful connection information
+ LOG.info(
+ String.format(
+ "Successfully connected to FTP server %s:%d in %s",
+ host, port, connectionMode));
return client;
}
@@ -170,13 +182,39 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
* @param client FTPClient
* @param mode mode
*/
- private void setFsFtpConnectionMode(FTPClient client, String mode) {
- switch (FtpConnectionMode.fromMode(mode)) {
+ private void setFsFtpConnectionMode(FTPClient client, String mode) throws
IOException {
+ FtpConnectionMode connectionMode = FtpConnectionMode.fromMode(mode);
+ switch (connectionMode) {
case PASSIVE_LOCAL:
client.enterLocalPassiveMode();
+ LOG.info("Using passive mode for FTP connection");
break;
case ACTIVE_LOCAL:
- client.enterLocalActiveMode();
+ // Create a test directory to check if active mode is working
+ String pathName = "/.ftptest" + System.currentTimeMillis();
+ try {
+ client.enterLocalActiveMode();
+ // test active mode is working or not
+ boolean created = client.makeDirectory(pathName);
+ if (!created) {
+ LOG.warn("Active mode failed, switching to passive
mode");
+ throw new IOException("FTP connection active mode test
failed");
+ }
+
+ LOG.info("Using active mode for FTP connection");
+ } catch (IOException e) {
+ // if active mode failed, switch to passive mode
+ client.enterLocalPassiveMode();
+ // update the connection mode to passive mode
+ getConf()
+ .set(FS_FTP_CONNECTION_MODE,
FtpConnectionMode.PASSIVE_LOCAL.getMode());
+ } finally {
+ // delete the test directory if it was created
+ FTPFile[] files = client.listFiles(pathName);
+ if (files != null && files.length > 0) {
+ client.deleteFile(pathName);
+ }
+ }
break;
default:
log.warn(
@@ -548,30 +586,50 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
*/
private boolean mkdirs(FTPClient client, Path file, FsPermission
permission)
throws IOException {
- boolean created = true;
Path workDir = new Path(client.printWorkingDirectory());
Path absolute = makeAbsolute(workDir, file);
- String pathName = absolute.getName();
- if (!exists(client, absolute)) {
- Path parent = absolute.getParent();
- created = parent == null || mkdirs(client, parent,
FsPermission.getDirDefault());
- if (created) {
- String parentDir = parent.toUri().getPath();
- client.changeWorkingDirectory(parentDir);
- LOG.debug("Creating directory " + pathName);
- created = client.makeDirectory(pathName);
+ // If directory already exists, return true
+ if (exists(client, absolute)) {
+ if (isFile(client, absolute)) {
+ throw new ParentNotDirectoryException(
+ String.format(
+ "Can't make directory for path %s since it is
a file.", absolute));
}
- } else if (isFile(client, absolute)) {
- throw new ParentNotDirectoryException(
+ return true;
+ }
+
+ // Create parent directories if they don't exist
+ Path parent = absolute.getParent();
+ if (parent != null && !exists(client, parent)) {
+ mkdirs(client, parent, FsPermission.getDirDefault());
+ }
+
+ // Create the directory
+ String pathName = absolute.getName();
+ String parentDir = parent != null ? parent.toUri().getPath() : "/";
+
+ // Change to parent directory
+ if (!client.changeWorkingDirectory(parentDir)) {
+ throw new IOException(
String.format(
- "Can't make directory for path %s since it is a
file.", absolute));
- } else {
- LOG.debug("Skipping creation of existing directory " + file);
+ "Failed to change working directory to %s, FTP
reply code: %d, reply string: %s",
+ parentDir, client.getReplyCode(),
client.getReplyString()));
}
+ // Create directory
+ boolean created = client.makeDirectory(pathName);
if (!created) {
- LOG.debug("Failed to create " + file);
+ // Double check if directory was actually created (some FTP
servers don't return true)
+ if (!exists(client, absolute)) {
+ throw new IOException(
+ String.format(
+ "Failed to create directory %s in %s, FTP
reply code: %d, reply string: %s",
+ pathName,
+ parentDir,
+ client.getReplyCode(),
+ client.getReplyString()));
+ }
}
- return created;
+ return true;
}
/**
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystemTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystemTest.java
new file mode 100644
index 0000000000..fd6a1a9d05
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystemTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.ftp.system;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockftpserver.fake.FakeFtpServer;
+import org.mockftpserver.fake.UserAccount;
+import org.mockftpserver.fake.filesystem.DirectoryEntry;
+import org.mockftpserver.fake.filesystem.FileEntry;
+import org.mockftpserver.fake.filesystem.FileSystem;
+import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for SeaTunnelFTPFileSystem. */
+public class SeaTunnelFTPFileSystemTest {
+
+ private static final String USERNAME = "testuser";
+ private static final String PASSWORD = "testpass";
+ private static final String HOME_DIR = "/home/testuser";
+ private static final int SERVER_PORT = 0; // Use random port
+
+ private FakeFtpServer fakeFtpServer;
+ private SeaTunnelFTPFileSystem ftpFileSystem;
+ private Configuration conf;
+ private int serverPort;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ // Set up the mock FTP server
+ fakeFtpServer = new FakeFtpServer();
+ fakeFtpServer.setServerControlPort(SERVER_PORT);
+
+ // Create user account
+ UserAccount userAccount = new UserAccount(USERNAME, PASSWORD,
HOME_DIR);
+ fakeFtpServer.addUserAccount(userAccount);
+
+ // Set up the file system
+ FileSystem fileSystem = new UnixFakeFileSystem();
+ fileSystem.add(new DirectoryEntry(HOME_DIR));
+ fileSystem.add(new FileEntry(HOME_DIR + "/test.txt", "Test content"));
+ fakeFtpServer.setFileSystem(fileSystem);
+
+ // Start the FTP server
+ fakeFtpServer.start();
+ serverPort = fakeFtpServer.getServerControlPort();
+
+ // Configure the FTP client
+ conf = new Configuration();
+ conf.set("fs.ftp.host", "localhost");
+ conf.setInt("fs.ftp.host.port", serverPort);
+ conf.set("fs.ftp.user.localhost", USERNAME);
+ conf.set("fs.ftp.password.localhost", PASSWORD);
+
+ // Initialize the FTP file system
+ ftpFileSystem = new SeaTunnelFTPFileSystem();
+ ftpFileSystem.initialize(new URI("ftp://localhost:" + serverPort),
conf);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (fakeFtpServer != null) {
+ fakeFtpServer.stop();
+ }
+ }
+
+ @Test
+ public void testMkdirs() throws IOException {
+ Path testDir = new Path(HOME_DIR + "/testDir/subDir");
+
+ // Create parent directories recursively
+ assertTrue(ftpFileSystem.mkdirs(testDir));
+
+ // Verify both parent and child directories exist
+ assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/testDir")));
+ assertTrue(ftpFileSystem.exists(testDir));
+
+ // Verify it's really a directory
+ FileStatus status = ftpFileSystem.getFileStatus(testDir);
+ assertTrue(status.isDirectory());
+ }
+
+ @Test
+ public void testCreateAndDeleteFile() throws IOException {
+ Path testFile = new Path(HOME_DIR + "/newfile.txt");
+ String content = "Hello, World!";
+
+ // Create file
+ try (FSDataOutputStream out =
+ ftpFileSystem.create(testFile, null, false, 1024, (short) 1,
1024, null)) {
+ out.write(content.getBytes(StandardCharsets.UTF_8));
+ }
+
+ // Verify file exists
+ assertTrue(ftpFileSystem.exists(testFile));
+
+ // Read file content
+ try (FSDataInputStream in = ftpFileSystem.open(testFile, 1024)) {
+ byte[] buffer = new byte[content.length()];
+ in.readFully(buffer);
+ assertEquals(content, new String(buffer, StandardCharsets.UTF_8));
+ }
+
+ // Delete file
+ assertTrue(ftpFileSystem.delete(testFile, false));
+ assertFalse(ftpFileSystem.exists(testFile));
+ }
+
+ @Test
+ public void testListStatus() throws IOException {
+ // Create test directory structure
+ Path testDir = new Path(HOME_DIR + "/testListDir");
+ ftpFileSystem.mkdirs(testDir, null);
+
+ Path testFile1 = new Path(testDir, "file1.txt");
+ Path testFile2 = new Path(testDir, "file2.txt");
+
+ try (FSDataOutputStream out =
+ ftpFileSystem.create(testFile1, null, false, 1024, (short) 1,
1024, null)) {
+ out.write("content1".getBytes(StandardCharsets.UTF_8));
+ }
+ try (FSDataOutputStream out =
+ ftpFileSystem.create(testFile2, null, false, 1024, (short) 1,
1024, null)) {
+ out.write("content2".getBytes(StandardCharsets.UTF_8));
+ }
+
+ FileStatus[] statuses = ftpFileSystem.listStatus(testDir);
+ assertEquals(2, statuses.length);
+
+ // Clean up
+ ftpFileSystem.delete(testDir, true);
+ }
+
+ @Test
+ public void testRename() throws IOException {
+ Path source = new Path(HOME_DIR + "/source.txt");
+ Path target = new Path(HOME_DIR + "/target.txt");
+
+ // Create source file
+ try (FSDataOutputStream out =
+ ftpFileSystem.create(source, null, false, 1024, (short) 1,
1024, null)) {
+ out.write("test content".getBytes(StandardCharsets.UTF_8));
+ }
+
+ // Rename file
+ assertTrue(ftpFileSystem.rename(source, target));
+ assertFalse(ftpFileSystem.exists(source));
+ assertTrue(ftpFileSystem.exists(target));
+ }
+
+ @Test
+ public void testConnectionModes() throws Exception {
+ // Test passive mode
+ conf.set("fs.ftp.connection.mode", "PASSIVE_LOCAL");
+ ftpFileSystem.initialize(new URI("ftp://localhost:" + serverPort),
conf);
+ Path testFile = new Path(HOME_DIR + "/passive_test.txt");
+ assertTrue(ftpFileSystem.mkdirs(testFile.getParent(), null));
+
+ // Test active mode
+ conf.set("fs.ftp.connection.mode", "ACTIVE_LOCAL");
+ ftpFileSystem.initialize(new URI("ftp://localhost:" + serverPort),
conf);
+ Path testFile2 = new Path(HOME_DIR + "/active_test.txt");
+ assertTrue(ftpFileSystem.mkdirs(testFile2.getParent(), null));
+ }
+
+ @Test
+ public void testMkdirsWithPermission() throws IOException {
+ Path testDir = new Path(HOME_DIR + "/testDir/subDir");
+ FsPermission permission = FsPermission.createImmutable((short) 0755);
// rwxr-xr-x
+
+ // Create parent directories recursively with permission
+ assertTrue(ftpFileSystem.mkdirs(testDir, permission));
+
+ // Verify both parent and child directories exist
+ assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/testDir")));
+ assertTrue(ftpFileSystem.exists(testDir));
+
+ // Verify it's really a directory
+ FileStatus status = ftpFileSystem.getFileStatus(testDir);
+ assertTrue(status.isDirectory());
+
+ // Verify directory was created in the mock filesystem
+ DirectoryEntry dirEntry =
+ (DirectoryEntry)
fakeFtpServer.getFileSystem().getEntry(testDir.toString());
+ assertNotNull(dirEntry);
+ }
+
+ @Test
+ public void testMkdirsWithNullPermission() throws IOException {
+ Path testDir = new Path(HOME_DIR + "/testDir/subDir");
+
+ // Create parent directories recursively with null permission
+ assertTrue(ftpFileSystem.mkdirs(testDir, null));
+
+ // Verify both parent and child directories exist
+ assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/testDir")));
+ assertTrue(ftpFileSystem.exists(testDir));
+
+ // Verify it's really a directory
+ FileStatus status = ftpFileSystem.getFileStatus(testDir);
+ assertTrue(status.isDirectory());
+ // Don't verify the exact permission since it may vary by system
+ assertNotNull(status.getPermission());
+ }
+
+ @Test
+ public void testMkdirsWithNestedDirectories() throws IOException {
+ Path deepDir = new Path(HOME_DIR + "/a/b/c/d");
+ FsPermission permission = FsPermission.createImmutable((short) 0755);
+
+ // Create nested directories
+ assertTrue(ftpFileSystem.mkdirs(deepDir, permission));
+
+ // Verify all parent directories exist
+ assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/a")));
+ assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/a/b")));
+ assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/a/b/c")));
+ assertTrue(ftpFileSystem.exists(deepDir));
+
+ // Verify all are directories
+ assertTrue(ftpFileSystem.getFileStatus(deepDir).isDirectory());
+ }
+
+ @Test
+ public void testMkdirsWithExistingDirectory() throws IOException {
+ Path testDir = new Path(HOME_DIR + "/existing");
+
+ // Create directory first time
+ assertTrue(ftpFileSystem.mkdirs(testDir));
+
+ // Try to create same directory again
+ assertTrue(ftpFileSystem.mkdirs(testDir));
+
+ // Verify it's still a directory
+ assertTrue(ftpFileSystem.getFileStatus(testDir).isDirectory());
+ }
+}