This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 26c528a5ed [Fix][Connector-V2][FTP] Fix FTP connector connection_mode 
is not effective (#7865)
26c528a5ed is described below

commit 26c528a5ed55175c3980ff6fc9725679387f4dc3
Author: Jast <[email protected]>
AuthorDate: Wed Oct 23 14:22:14 2024 +0800

    [Fix][Connector-V2][FTP] Fix FTP connector connection_mode is not effective 
(#7865)
---
 .../file/ftp/config/FtpConfigOptions.java          |   4 +-
 .../file/ftp/system/FtpConnectionMode.java         |   6 +-
 .../file/ftp/system/SeaTunnelFTPFileSystem.java    |  21 ++--
 .../e2e/connector/file/ftp/FtpFileIT.java          |  72 ++++++++++-
 .../text/fake_to_ftp_file_text_for_passive.conf    |  88 +++++++++++++
 .../text/ftp_file_text_to_assert_for_passive.conf  | 136 +++++++++++++++++++++
 6 files changed, 312 insertions(+), 15 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
index 1f00a56abf..645225b9ea 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.api.configuration.Options;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode.ACTIVE_LOCAL_DATA_CONNECTION_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode.ACTIVE_LOCAL;
 
 public class FtpConfigOptions extends BaseSourceConfigOptions {
     public static final Option<String> FTP_PASSWORD =
@@ -42,6 +42,6 @@ public class FtpConfigOptions extends BaseSourceConfigOptions 
{
     public static final Option<FtpConnectionMode> FTP_CONNECTION_MODE =
             Options.key("connection_mode")
                     .enumType(FtpConnectionMode.class)
-                    .defaultValue(ACTIVE_LOCAL_DATA_CONNECTION_MODE)
+                    .defaultValue(ACTIVE_LOCAL)
                     .withDescription("FTP server connection mode ");
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java
index 068aa5974c..44f2264fb2 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java
@@ -21,10 +21,10 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.ftp.system;
 public enum FtpConnectionMode {
 
     /** ACTIVE_LOCAL_DATA_CONNECTION_MODE */
-    ACTIVE_LOCAL_DATA_CONNECTION_MODE("active_local"),
+    ACTIVE_LOCAL("active_local"),
 
     /** PASSIVE_LOCAL_DATA_CONNECTION_MODE */
-    PASSIVE_LOCAL_DATA_CONNECTION_MODE("passive_local");
+    PASSIVE_LOCAL("passive_local");
 
     private final String mode;
 
@@ -38,7 +38,7 @@ public enum FtpConnectionMode {
 
     public static FtpConnectionMode fromMode(String mode) {
         for (FtpConnectionMode ftpConnectionModeEnum : 
FtpConnectionMode.values()) {
-            if (ftpConnectionModeEnum.getMode().equals(mode)) {
+            if (ftpConnectionModeEnum.getMode().equals(mode.toLowerCase())) {
                 return ftpConnectionModeEnum;
             }
         }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
index 04ba218e45..029890918d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progressable;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -52,6 +54,7 @@ import java.net.URI;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
+@Slf4j
 public class SeaTunnelFTPFileSystem extends FileSystem {
     public static final Log LOG = 
LogFactory.getLog(SeaTunnelFTPFileSystem.class);
 
@@ -156,10 +159,7 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
         }
 
         setFsFtpConnectionMode(
-                client,
-                conf.get(
-                        FS_FTP_CONNECTION_MODE,
-                        
FtpConnectionMode.ACTIVE_LOCAL_DATA_CONNECTION_MODE.getMode()));
+                client, conf.get(FS_FTP_CONNECTION_MODE, 
FtpConnectionMode.ACTIVE_LOCAL.getMode()));
 
         return client;
     }
@@ -172,13 +172,18 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
      */
     private void setFsFtpConnectionMode(FTPClient client, String mode) {
         switch (FtpConnectionMode.fromMode(mode)) {
-            case ACTIVE_LOCAL_DATA_CONNECTION_MODE:
-                client.enterLocalActiveMode();
-                break;
-            case PASSIVE_LOCAL_DATA_CONNECTION_MODE:
+            case PASSIVE_LOCAL:
                 client.enterLocalPassiveMode();
                 break;
+            case ACTIVE_LOCAL:
+                client.enterLocalActiveMode();
+                break;
             default:
+                log.warn(
+                        "Unsupported FTP connection mode: " + mode,
+                        " Using default FTP connection mode: "
+                                + FtpConnectionMode.ACTIVE_LOCAL.getMode());
+                client.enterLocalActiveMode();
                 break;
         }
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 70b2463ea8..bc246e9007 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -31,8 +31,10 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.lifecycle.Startables;
 import 
org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback;
 
@@ -42,10 +44,15 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.StringReader;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 @DisabledOnContainer(
@@ -68,14 +75,30 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
 
     private GenericContainer<?> ftpContainer;
 
+    private String ftpPassiveAddress;
+
+    private BiFunction<Integer, Integer, Integer[]> generateExposedPorts =
+            (startPort, endPort) ->
+                    IntStream.rangeClosed(startPort, 
endPort).boxed().toArray(Integer[]::new);
+
+    private BiFunction<Integer, Integer, List<String>> generatePortBindings =
+            (startPort, endPort) ->
+                    IntStream.rangeClosed(startPort, endPort)
+                            .mapToObj(i -> i + ":" + i)
+                            .collect(Collectors.toList());
+
     @BeforeAll
     @Override
     public void startUp() throws Exception {
+        int passiveStartPort = 30000;
+        int passiveEndPort = 30004;
         ftpContainer =
                 new GenericContainer<>(FTP_IMAGE)
                         .withExposedPorts(FTP_PORT)
                         .withNetwork(NETWORK)
                         .withExposedPorts(FTP_PORT)
+                        .withExposedPorts(
+                                generateExposedPorts.apply(passiveStartPort, 
passiveEndPort))
                         .withNetworkAliases(ftp_CONTAINER_HOST)
                         .withEnv("FILE_OPEN_MODE", "0666")
                         .withEnv("WRITE_ENABLE", "YES")
@@ -85,13 +108,31 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
                         .withEnv("LOCAL_UMASK", "000")
                         .withEnv("FTP_USER", USERNAME)
                         .withEnv("FTP_PASS", PASSWORD)
-                        .withEnv("PASV_ADDRESS", "0.0.0.0")
+                        .withEnv("PASV_MIN_PORT", 
String.valueOf(passiveStartPort))
+                        .withEnv("PASV_MAX_PORT", 
String.valueOf(passiveEndPort))
                         .withLogConsumer(new Slf4jLogConsumer(log))
+                        // Modify the strategy mode because the passive mode 
port does not need to
+                        // be checked here, it does not start with the FTP 
startup.
+                        .waitingFor(Wait.forLogMessage(".*", 1))
                         .withPrivilegedMode(true);
 
-        ftpContainer.setPortBindings(Collections.singletonList("21:21"));
+        List<String> portBind = new ArrayList<>();
+        portBind.add("21:21");
+        portBind.addAll(generatePortBindings.apply(passiveStartPort, 
passiveEndPort));
+
+        ftpContainer.setPortBindings(portBind);
         ftpContainer.start();
         Startables.deepStart(Stream.of(ftpContainer)).join();
+
+        // Get the passive mode address of the FTP container
+        Properties properties = new Properties();
+        properties.load(
+                new StringReader(
+                        ftpContainer
+                                .execInContainer("sh", "-c", "cat 
/etc/vsftpd/vsftpd.conf")
+                                .getStdout()));
+        ftpPassiveAddress = properties.getProperty("pasv_address");
+
         log.info("ftp container started");
 
         ContainerUtil.copyFileIntoContainers(
@@ -126,6 +167,33 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp 
/home/vsftpd/seatunnel/");
     }
 
+    @TestTemplate
+    public void testFtpFileReadAndWriteForPassive(TestContainer container)
+            throws IOException, InterruptedException {
+        List<String> configParams = Collections.singletonList("ftpHost=" + 
ftpPassiveAddress);
+        // Test passive mode
+        assertJobExecution(
+                container, "/text/ftp_file_text_to_assert_for_passive.conf", 
configParams);
+        assertJobExecution(container, 
"/text/fake_to_ftp_file_text_for_passive.conf", configParams);
+
+        String homePath = "/home/vsftpd/seatunnel/tmp/seatunnel/passive_text";
+        // test write ftp text file
+        Assertions.assertEquals(1, getFileListFromContainer(homePath).size());
+
+        // Confirm data is written correctly
+        Container.ExecResult execResult =
+                ftpContainer.execInContainer("sh", "-c", "awk 'END {print NR}' 
" + homePath + "/*");
+        Assertions.assertEquals("15", execResult.getStdout().trim());
+
+        deleteFileFromContainer(homePath);
+    }
+
+    private void assertJobExecution(TestContainer container, String 
configPath, List<String> params)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob(configPath, 
params);
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
     @TestTemplate
     public void testFtpFileReadAndWrite(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/fake_to_ftp_file_text_for_passive.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/fake_to_ftp_file_text_for_passive.conf
new file mode 100644
index 0000000000..d81c1ff6a9
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/fake_to_ftp_file_text_for_passive.conf
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    result_table_name = "ftp"
+    row.num = 15
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  FtpFile {
+    host = ${ftpHost}
+    port = 21
+    user = seatunnel
+    password = pass
+    connection_mode = "passive_local"
+    path = "/tmp/seatunnel/passive_text"
+    source_table_name = "ftp"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "text"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_file_text_to_assert_for_passive.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_file_text_to_assert_for_passive.conf
new file mode 100644
index 0000000000..cfa64dd600
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/ftp_file_text_to_assert_for_passive.conf
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FtpFile {
+    host = ${ftpHost}
+    port = 21
+    user = seatunnel
+    password = pass
+    connection_mode = "passive_local"
+    path = "/tmp/seatunnel/read/text"
+    file_format_type = "text"
+    result_table_name = "ftp"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "ftp"
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = hobby
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file

Reply via email to