This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f6bcc4d59d [Feature][Connectors-v2-file-ftp] FTP source/sink add ftp
connection mode (#6077) (#6099)
f6bcc4d59d is described below
commit f6bcc4d59d91e6c47f0726fa6051b6c324ae5c2e
Author: mingbei.xu <[email protected]>
AuthorDate: Tue Jan 2 18:01:36 2024 +0800
[Feature][Connectors-v2-file-ftp] FTP source/sink add ftp connection mode
(#6077) (#6099)
---
docs/en/connector-v2/sink/FtpFile.md | 7 ++
docs/en/connector-v2/source/FtpFile.md | 7 ++
.../seatunnel/file/config/BaseFileSinkConfig.java | 5 ++
.../seatunnel/file/ftp/config/FtpConf.java | 5 ++
.../file/ftp/config/FtpConfigOptions.java | 8 ++
.../file/ftp/sink/FtpFileSinkFactory.java | 1 +
.../file/ftp/source/FtpFileSourceFactory.java | 1 +
.../file/ftp/system/FtpConnectionMode.java | 47 ++++++++++++
.../file/ftp/system/SeaTunnelFTPFileSystem.java | 27 +++++++
.../e2e/connector/file/ftp/FtpFileIT.java | 5 ++
.../excel/fake_source_to_ftp_root_path_excel.conf | 85 ++++++++++++++++++++++
11 files changed, 198 insertions(+)
diff --git a/docs/en/connector-v2/sink/FtpFile.md
b/docs/en/connector-v2/sink/FtpFile.md
index ab55b6e4da..3233fc3c6d 100644
--- a/docs/en/connector-v2/sink/FtpFile.md
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -38,6 +38,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| password | string | yes | -
|
|
| path | string | yes | -
|
|
| tmp_path | string | yes | /tmp/seatunnel
| The result file will write to a tmp path first and then
use `mv` to submit tmp dir to target dir. Need a FTP dir. |
+| connection_mode | string | no | active_local
| The target ftp connection mode
|
| custom_filename | boolean | no | false
| Whether you need custom the filename
|
| file_name_expression | string | no | "${transactionId}"
| Only used when custom_filename is true
|
| filename_time_format | string | no | "yyyy.MM.dd"
| Only used when custom_filename is true
|
@@ -76,6 +77,12 @@ The target ftp password is required
The target dir path is required.
+### connection_mode [string]
+
+The target ftp connection mode , default is active mode, supported as the
following modes:
+
+`active_local` `passive_local`
+
### custom_filename [boolean]
Whether custom the filename
diff --git a/docs/en/connector-v2/source/FtpFile.md
b/docs/en/connector-v2/source/FtpFile.md
index 781d7d40bc..ee231bb087 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -44,6 +44,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
| password | string | yes | - |
| path | string | yes | - |
| file_format_type | string | yes | - |
+| connection_mode | string | no | active_local |
| delimiter/field_delimiter | string | no | \001 |
| read_columns | list | no | - |
| parse_partition_from_path | boolean | no | true |
@@ -154,6 +155,12 @@ connector will generate data as the following:
|---------------|-----|--------|
| tyrantlucifer | 26 | male |
+### connection_mode [string]
+
+The target ftp connection mode , default is active mode, supported as the
following modes:
+
+`active_local` `passive_local`
+
### delimiter/field_delimiter [string]
**delimiter** parameter will deprecate after version 2.3.5, please use
**field_delimiter** instead.
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index 112ab9fa1c..3a6513e993 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
import lombok.Data;
import lombok.NonNull;
+import java.io.File;
import java.io.Serializable;
import java.util.Locale;
@@ -71,6 +72,10 @@ public class BaseFileSinkConfig implements DelimiterConfig,
Serializable {
}
checkNotNull(path);
+ if (path.equals(File.separator)) {
+ this.path = "";
+ }
+
if (config.hasPath(BaseSinkConfig.FILE_NAME_EXPRESSION.key())
&& !StringUtils.isBlank(
config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key()))) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
index 7ab43b6db1..9186e1d8ee 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
@@ -52,6 +52,11 @@ public class FtpConf extends HadoopConf {
"fs.ftp.user." + host,
config.getString(FtpConfigOptions.FTP_USERNAME.key()));
ftpOptions.put(
"fs.ftp.password." + host,
config.getString(FtpConfigOptions.FTP_PASSWORD.key()));
+ if (config.hasPath(FtpConfigOptions.FTP_CONNECTION_MODE.key())) {
+ ftpOptions.put(
+ "fs.ftp.connection.mode",
+
config.getString(FtpConfigOptions.FTP_CONNECTION_MODE.key()));
+ }
hadoopConf.setExtraOptions(ftpOptions);
return hadoopConf;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
index 2834b7ac2f..1f00a56abf 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
@@ -20,6 +20,9 @@ package
org.apache.seatunnel.connectors.seatunnel.file.ftp.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+import
org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode;
+
+import static
org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode.ACTIVE_LOCAL_DATA_CONNECTION_MODE;
public class FtpConfigOptions extends BaseSourceConfigOptions {
public static final Option<String> FTP_PASSWORD =
@@ -36,4 +39,9 @@ public class FtpConfigOptions extends BaseSourceConfigOptions
{
Options.key("host").stringType().noDefaultValue().withDescription("FTP server
host");
public static final Option<Integer> FTP_PORT =
Options.key("port").intType().noDefaultValue().withDescription("FTP server
port");
+ public static final Option<FtpConnectionMode> FTP_CONNECTION_MODE =
+ Options.key("connection_mode")
+ .enumType(FtpConnectionMode.class)
+ .defaultValue(ACTIVE_LOCAL_DATA_CONNECTION_MODE)
+ .withDescription("FTP server connection mode ");
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
index 2cc9a06a5f..a3fbf886fb 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
@@ -85,6 +85,7 @@ public class FtpFileSinkFactory implements TableSinkFactory {
.optional(BaseSinkConfig.DATE_FORMAT)
.optional(BaseSinkConfig.DATETIME_FORMAT)
.optional(BaseSinkConfig.TIME_FORMAT)
+ .optional(FtpConfigOptions.FTP_CONNECTION_MODE)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index e15afac55a..529c93a3f7 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -62,6 +62,7 @@ public class FtpFileSourceFactory implements
TableSourceFactory {
.optional(BaseSourceConfigOptions.TIME_FORMAT)
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
+ .optional(FtpConfigOptions.FTP_CONNECTION_MODE)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java
new file mode 100644
index 0000000000..068aa5974c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.ftp.system;
+
+/** Ftp connection mode enum. href="http://commons.apache.org/net/">Apache
Commons Net</a>. */
+public enum FtpConnectionMode {
+
+ /** ACTIVE_LOCAL_DATA_CONNECTION_MODE */
+ ACTIVE_LOCAL_DATA_CONNECTION_MODE("active_local"),
+
+ /** PASSIVE_LOCAL_DATA_CONNECTION_MODE */
+ PASSIVE_LOCAL_DATA_CONNECTION_MODE("passive_local");
+
+ private final String mode;
+
+ FtpConnectionMode(String mode) {
+ this.mode = mode;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public static FtpConnectionMode fromMode(String mode) {
+ for (FtpConnectionMode ftpConnectionModeEnum :
FtpConnectionMode.values()) {
+ if (ftpConnectionModeEnum.getMode().equals(mode)) {
+ return ftpConnectionModeEnum;
+ }
+ }
+ throw new IllegalArgumentException("Unknown ftp connection mode: " +
mode);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
index 4b69c63416..04ba218e45 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
@@ -62,6 +62,8 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
public static final String FS_FTP_HOST = "fs.ftp.host";
public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port";
public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password.";
+ public static final String FS_FTP_CONNECTION_MODE =
"fs.ftp.connection.mode";
+
public static final String E_SAME_DIRECTORY_ONLY = "only same directory
renames are supported";
private URI uri;
@@ -153,9 +155,34 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
+ "'");
}
+ setFsFtpConnectionMode(
+ client,
+ conf.get(
+ FS_FTP_CONNECTION_MODE,
+
FtpConnectionMode.ACTIVE_LOCAL_DATA_CONNECTION_MODE.getMode()));
+
return client;
}
+ /**
+ * Set FTP connection mode. *
+ *
+ * @param client FTPClient
+ * @param mode mode
+ */
+ private void setFsFtpConnectionMode(FTPClient client, String mode) {
+ switch (FtpConnectionMode.fromMode(mode)) {
+ case ACTIVE_LOCAL_DATA_CONNECTION_MODE:
+ client.enterLocalActiveMode();
+ break;
+ case PASSIVE_LOCAL_DATA_CONNECTION_MODE:
+ client.enterLocalPassiveMode();
+ break;
+ default:
+ break;
+ }
+ }
+
/**
* Logout and disconnect the given FTPClient. *
*
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 15a58ebf08..2a1598bf32 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -104,6 +104,9 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
ftpContainer);
+ ContainerUtil.copyFileIntoContainers(
+ "/excel/e2e.xlsx", "/home/vsftpd/seatunnel/e2e.xlsx",
ftpContainer);
+
ftpContainer.execInContainer("sh", "-c", "chmod -R 777
/home/vsftpd/seatunnel/");
ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp
/home/vsftpd/seatunnel/");
}
@@ -136,6 +139,8 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
helper.execute("/parquet/fake_to_ftp_file_parquet.conf");
// test write ftp orc file
helper.execute("/orc/fake_to_ftp_file_orc.conf");
+ // test write ftp root path excel file
+ helper.execute("/excel/fake_source_to_ftp_root_path_excel.conf");
}
@AfterAll
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/fake_source_to_ftp_root_path_excel.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/fake_source_to_ftp_root_path_excel.conf
new file mode 100644
index 0000000000..3e11b0a08f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/fake_source_to_ftp_root_path_excel.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ result_table_name = "ftp"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ FtpFile {
+ host = "ftp"
+ port = 21
+ user = seatunnel
+ password = pass
+ path = "/"
+ source_table_name = "ftp"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "excel"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ }
+}