This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 26c528a5ed [Fix][Connector-V2][FTP] Fix FTP connector connection_mode
is not effective (#7865)
26c528a5ed is described below
commit 26c528a5ed55175c3980ff6fc9725679387f4dc3
Author: Jast <[email protected]>
AuthorDate: Wed Oct 23 14:22:14 2024 +0800
[Fix][Connector-V2][FTP] Fix FTP connector connection_mode is not effective
(#7865)
---
.../file/ftp/config/FtpConfigOptions.java | 4 +-
.../file/ftp/system/FtpConnectionMode.java | 6 +-
.../file/ftp/system/SeaTunnelFTPFileSystem.java | 21 ++--
.../e2e/connector/file/ftp/FtpFileIT.java | 72 ++++++++++-
.../text/fake_to_ftp_file_text_for_passive.conf | 88 +++++++++++++
.../text/ftp_file_text_to_assert_for_passive.conf | 136 +++++++++++++++++++++
6 files changed, 312 insertions(+), 15 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
index 1f00a56abf..645225b9ea 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.api.configuration.Options;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import
org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode;
-import static
org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode.ACTIVE_LOCAL_DATA_CONNECTION_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode.ACTIVE_LOCAL;
public class FtpConfigOptions extends BaseSourceConfigOptions {
public static final Option<String> FTP_PASSWORD =
@@ -42,6 +42,6 @@ public class FtpConfigOptions extends BaseSourceConfigOptions
{
public static final Option<FtpConnectionMode> FTP_CONNECTION_MODE =
Options.key("connection_mode")
.enumType(FtpConnectionMode.class)
- .defaultValue(ACTIVE_LOCAL_DATA_CONNECTION_MODE)
+ .defaultValue(ACTIVE_LOCAL)
.withDescription("FTP server connection mode ");
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java
index 068aa5974c..44f2264fb2 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java
@@ -21,10 +21,10 @@ package
org.apache.seatunnel.connectors.seatunnel.file.ftp.system;
public enum FtpConnectionMode {
/** ACTIVE_LOCAL_DATA_CONNECTION_MODE */
- ACTIVE_LOCAL_DATA_CONNECTION_MODE("active_local"),
+ ACTIVE_LOCAL("active_local"),
/** PASSIVE_LOCAL_DATA_CONNECTION_MODE */
- PASSIVE_LOCAL_DATA_CONNECTION_MODE("passive_local");
+ PASSIVE_LOCAL("passive_local");
private final String mode;
@@ -38,7 +38,7 @@ public enum FtpConnectionMode {
public static FtpConnectionMode fromMode(String mode) {
for (FtpConnectionMode ftpConnectionModeEnum :
FtpConnectionMode.values()) {
- if (ftpConnectionModeEnum.getMode().equals(mode)) {
+ if (ftpConnectionModeEnum.getMode().equals(mode.toLowerCase())) {
return ftpConnectionModeEnum;
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
index 04ba218e45..029890918d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progressable;
+import lombok.extern.slf4j.Slf4j;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -52,6 +54,7 @@ import java.net.URI;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
+@Slf4j
public class SeaTunnelFTPFileSystem extends FileSystem {
public static final Log LOG =
LogFactory.getLog(SeaTunnelFTPFileSystem.class);
@@ -156,10 +159,7 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
}
setFsFtpConnectionMode(
- client,
- conf.get(
- FS_FTP_CONNECTION_MODE,
-
FtpConnectionMode.ACTIVE_LOCAL_DATA_CONNECTION_MODE.getMode()));
+ client, conf.get(FS_FTP_CONNECTION_MODE,
FtpConnectionMode.ACTIVE_LOCAL.getMode()));
return client;
}
@@ -172,13 +172,18 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
*/
private void setFsFtpConnectionMode(FTPClient client, String mode) {
switch (FtpConnectionMode.fromMode(mode)) {
- case ACTIVE_LOCAL_DATA_CONNECTION_MODE:
- client.enterLocalActiveMode();
- break;
- case PASSIVE_LOCAL_DATA_CONNECTION_MODE:
+ case PASSIVE_LOCAL:
client.enterLocalPassiveMode();
break;
+ case ACTIVE_LOCAL:
+ client.enterLocalActiveMode();
+ break;
default:
+ log.warn(
+ "Unsupported FTP connection mode: " + mode,
+ " Using default FTP connection mode: "
+ + FtpConnectionMode.ACTIVE_LOCAL.getMode());
+ client.enterLocalActiveMode();
break;
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 70b2463ea8..bc246e9007 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -31,8 +31,10 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.lifecycle.Startables;
import
org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback;
@@ -42,10 +44,15 @@ import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
@DisabledOnContainer(
@@ -68,14 +75,30 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
private GenericContainer<?> ftpContainer;
+ private String ftpPassiveAddress;
+
+ private BiFunction<Integer, Integer, Integer[]> generateExposedPorts =
+ (startPort, endPort) ->
+ IntStream.rangeClosed(startPort,
endPort).boxed().toArray(Integer[]::new);
+
+ private BiFunction<Integer, Integer, List<String>> generatePortBindings =
+ (startPort, endPort) ->
+ IntStream.rangeClosed(startPort, endPort)
+ .mapToObj(i -> i + ":" + i)
+ .collect(Collectors.toList());
+
@BeforeAll
@Override
public void startUp() throws Exception {
+ int passiveStartPort = 30000;
+ int passiveEndPort = 30004;
ftpContainer =
new GenericContainer<>(FTP_IMAGE)
.withExposedPorts(FTP_PORT)
.withNetwork(NETWORK)
.withExposedPorts(FTP_PORT)
+ .withExposedPorts(
+ generateExposedPorts.apply(passiveStartPort,
passiveEndPort))
.withNetworkAliases(ftp_CONTAINER_HOST)
.withEnv("FILE_OPEN_MODE", "0666")
.withEnv("WRITE_ENABLE", "YES")
@@ -85,13 +108,31 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
.withEnv("LOCAL_UMASK", "000")
.withEnv("FTP_USER", USERNAME)
.withEnv("FTP_PASS", PASSWORD)
- .withEnv("PASV_ADDRESS", "0.0.0.0")
+ .withEnv("PASV_MIN_PORT",
String.valueOf(passiveStartPort))
+ .withEnv("PASV_MAX_PORT",
String.valueOf(passiveEndPort))
.withLogConsumer(new Slf4jLogConsumer(log))
+ // Modify the strategy mode because the passive mode
port does not need to
+ // be checked here, it does not start with the FTP
startup.
+ .waitingFor(Wait.forLogMessage(".*", 1))
.withPrivilegedMode(true);
- ftpContainer.setPortBindings(Collections.singletonList("21:21"));
+ List<String> portBind = new ArrayList<>();
+ portBind.add("21:21");
+ portBind.addAll(generatePortBindings.apply(passiveStartPort,
passiveEndPort));
+
+ ftpContainer.setPortBindings(portBind);
ftpContainer.start();
Startables.deepStart(Stream.of(ftpContainer)).join();
+
+ // Get the passive mode address of the FTP container
+ Properties properties = new Properties();
+ properties.load(
+ new StringReader(
+ ftpContainer
+ .execInContainer("sh", "-c", "cat
/etc/vsftpd/vsftpd.conf")
+ .getStdout()));
+ ftpPassiveAddress = properties.getProperty("pasv_address");
+
log.info("ftp container started");
ContainerUtil.copyFileIntoContainers(
@@ -126,6 +167,33 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp
/home/vsftpd/seatunnel/");
}
+ @TestTemplate
+ public void testFtpFileReadAndWriteForPassive(TestContainer container)
+ throws IOException, InterruptedException {
+ List<String> configParams = Collections.singletonList("ftpHost=" +
ftpPassiveAddress);
+ // Test passive mode
+ assertJobExecution(
+ container, "/text/ftp_file_text_to_assert_for_passive.conf",
configParams);
+ assertJobExecution(container,
"/text/fake_to_ftp_file_text_for_passive.conf", configParams);
+
+ String homePath = "/home/vsftpd/seatunnel/tmp/seatunnel/passive_text";
+ // test write ftp text file
+ Assertions.assertEquals(1, getFileListFromContainer(homePath).size());
+
+ // Confirm data is written correctly
+ Container.ExecResult execResult =
+ ftpContainer.execInContainer("sh", "-c", "awk 'END {print NR}'
" + homePath + "/*");
+ Assertions.assertEquals("15", execResult.getStdout().trim());
+
+ deleteFileFromContainer(homePath);
+ }
+
+ private void assertJobExecution(TestContainer container, String
configPath, List<String> params)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult = container.executeJob(configPath,
params);
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
@TestTemplate
public void testFtpFileReadAndWrite(TestContainer container)
throws IOException, InterruptedException {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/fake_to_ftp_file_text_for_passive.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/fake_to_ftp_file_text_for_passive.conf
new file mode 100644
index 0000000000..d81c1ff6a9
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/fake_to_ftp_file_text_for_passive.conf
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ result_table_name = "ftp"
+ row.num = 15
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ FtpFile {
+ host = ${ftpHost}
+ port = 21
+ user = seatunnel
+ password = pass
+ connection_mode = "passive_local"
+ path = "/tmp/seatunnel/passive_text"
+ source_table_name = "ftp"
+ row_delimiter = "\n"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "text"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_file_text_to_assert_for_passive.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_file_text_to_assert_for_passive.conf
new file mode 100644
index 0000000000..cfa64dd600
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_file_text_to_assert_for_passive.conf
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FtpFile {
+ host = ${ftpHost}
+ port = 21
+ user = seatunnel
+ password = pass
+ connection_mode = "passive_local"
+ path = "/tmp/seatunnel/read/text"
+ file_format_type = "text"
+ result_table_name = "ftp"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = "ftp"
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = hobby
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file