This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4f812e12ae [Feature][Connector-V2] Ftp file sink suport multiple table
and save mode (#7665)
4f812e12ae is described below
commit 4f812e12aeb20c301bdc5bc5fc93b47b6b2cee50
Author: 老王 <[email protected]>
AuthorDate: Sat Sep 14 21:42:45 2024 +0800
[Feature][Connector-V2] Ftp file sink suport multiple table and save mode
(#7665)
---
docs/en/connector-v2/sink/FtpFile.md | 42 +++++++++
.../seatunnel/file/ftp/catalog/FtpFileCatalog.java | 29 ++++++
.../file/ftp/catalog/FtpFileCatalogFactory.java | 53 +++++++++++
.../seatunnel/file/ftp/config/FtpConf.java | 25 ++---
.../seatunnel/file/ftp/sink/FtpFileSink.java | 39 ++------
.../file/ftp/sink/FtpFileSinkFactory.java | 25 ++++-
.../seatunnel/file/ftp/source/FtpFileSource.java | 3 +-
.../e2e/connector/file/ftp/FtpFileIT.java | 73 ++++++++++++++
.../text/multiple_table_fake_to_ftp_file_text.conf | 105 +++++++++++++++++++++
.../multiple_table_fake_to_ftp_file_text_2.conf | 105 +++++++++++++++++++++
10 files changed, 451 insertions(+), 48 deletions(-)
diff --git a/docs/en/connector-v2/sink/FtpFile.md
b/docs/en/connector-v2/sink/FtpFile.md
index 9305aa7e99..5b927bda12 100644
--- a/docs/en/connector-v2/sink/FtpFile.md
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -64,6 +64,8 @@ By default, we use 2PC commit to ensure `exactly-once`
| parquet_avro_write_timestamp_as_int96 | boolean | no | false
| Only used when file_format is parquet.
|
| parquet_avro_write_fixed_as_int96 | array | no | -
| Only used when file_format is parquet.
|
| encoding | string | no | "UTF-8"
| Only used when file_format_type is
json,text,csv,xml. |
+| schema_save_mode | string | no |
CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method
|
+| data_save_mode | string | no | APPEND_DATA
| Existing data processing method
|
### host [string]
@@ -227,6 +229,18 @@ Support writing Parquet INT96 from a 12-byte field, only
valid for parquet files
Only used when file_format_type is json,text,csv,xml.
The encoding of the file to write. This param will be parsed by
`Charset.forName(encoding)`.
+### schema_save_mode [string]
+Existing dir processing method.
+- RECREATE_SCHEMA: will create when the dir does not exist, delete and
recreate when the dir is exist
+- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist,
skipped when the dir is exist
+- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not
exist
+- IGNORE :Ignore the treatment of the table
+
+### data_save_mode [string]
+Existing data processing method.
+- DROP_DATA: preserve dir and delete data files
+- APPEND_DATA: preserve dir, preserve data files
+- ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported
## Example
For text file format simple config
@@ -273,6 +287,34 @@ FtpFile {
```
+When our source end is multiple tables, and wants different expressions to
different directory, we can configure this way
+
+```hocon
+
+FtpFile {
+ host = "xxx.xxx.xxx.xxx"
+ port = 21
+ user = "username"
+ password = "password"
+ path = "/data/ftp/seatunnel/job1/${table_name}"
+ tmp_path = "/data/ftp/seatunnel/tmp"
+ file_format_type = "text"
+ field_delimiter = "\t"
+ row_delimiter = "\n"
+ have_partition = true
+ partition_by = ["age"]
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ custom_filename = true
+ file_name_expression = "${transactionId}_${now}"
+ sink_columns = ["name","age"]
+ filename_time_format = "yyyy.MM.dd"
+ schema_save_mode=RECREATE_SCHEMA
+ data_save_mode=DROP_DATA
+}
+
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalog.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalog.java
new file mode 100644
index 0000000000..2bf0bf49e5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalog.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.ftp.catalog;
+
+import
org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+
+public class FtpFileCatalog extends AbstractFileCatalog {
+
+ public FtpFileCatalog(
+ HadoopFileSystemProxy hadoopFileSystemProxy, String filePath,
String catalogName) {
+ super(hadoopFileSystemProxy, filePath, catalogName);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalogFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalogFactory.java
new file mode 100644
index 0000000000..74f05c12d7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalogFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.ftp.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class FtpFileCatalogFactory implements CatalogFactory {
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ HadoopFileSystemProxy fileSystemUtils =
+ new HadoopFileSystemProxy(FtpConf.buildWithConfig(options));
+ return new FtpFileCatalog(
+ fileSystemUtils,
+ options.get(BaseSourceConfigOptions.FILE_PATH),
+ FileSystemType.FTP.getFileSystemPluginName());
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return FileSystemType.FTP.getFileSystemPluginName();
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
index 9186e1d8ee..bd98800c54 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
@@ -17,18 +17,19 @@
package org.apache.seatunnel.connectors.seatunnel.file.ftp.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode;
import java.util.HashMap;
+import java.util.Optional;
public class FtpConf extends HadoopConf {
private static final String HDFS_IMPL =
"org.apache.seatunnel.connectors.seatunnel.file.ftp.system.SeaTunnelFTPFileSystem";
private static final String SCHEMA = "ftp";
- private FtpConf(String hdfsNameKey) {
+ public FtpConf(String hdfsNameKey) {
super(hdfsNameKey);
}
@@ -42,20 +43,20 @@ public class FtpConf extends HadoopConf {
return SCHEMA;
}
- public static HadoopConf buildWithConfig(Config config) {
- String host = config.getString(FtpConfigOptions.FTP_HOST.key());
- int port = config.getInt(FtpConfigOptions.FTP_PORT.key());
+ public static HadoopConf buildWithConfig(ReadonlyConfig config) {
+ String host = config.get(FtpConfigOptions.FTP_HOST);
+ int port = config.get(FtpConfigOptions.FTP_PORT);
String defaultFS = String.format("ftp://%s:%s", host, port);
HadoopConf hadoopConf = new FtpConf(defaultFS);
HashMap<String, String> ftpOptions = new HashMap<>();
- ftpOptions.put(
- "fs.ftp.user." + host,
config.getString(FtpConfigOptions.FTP_USERNAME.key()));
- ftpOptions.put(
- "fs.ftp.password." + host,
config.getString(FtpConfigOptions.FTP_PASSWORD.key()));
- if (config.hasPath(FtpConfigOptions.FTP_CONNECTION_MODE.key())) {
+ ftpOptions.put("fs.ftp.user." + host,
config.get(FtpConfigOptions.FTP_USERNAME));
+ ftpOptions.put("fs.ftp.password." + host,
config.get(FtpConfigOptions.FTP_PASSWORD));
+ Optional<FtpConnectionMode> optional =
+ config.getOptional(FtpConfigOptions.FTP_CONNECTION_MODE);
+ if (optional.isPresent()) {
ftpOptions.put(
"fs.ftp.connection.mode",
-
config.getString(FtpConfigOptions.FTP_CONNECTION_MODE.key()));
+
config.get(FtpConfigOptions.FTP_CONNECTION_MODE).toString());
}
hadoopConf.setExtraOptions(ftpOptions);
return hadoopConf;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
index 031d442f20..f4b271e035 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
@@ -17,46 +17,19 @@
package org.apache.seatunnel.connectors.seatunnel.file.ftp.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfigOptions;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
-
-import com.google.auto.service.AutoService;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
-@AutoService(SeaTunnelSink.class)
-public class FtpFileSink extends BaseFileSink {
+public class FtpFileSink extends BaseMultipleTableFileSink {
@Override
public String getPluginName() {
return FileSystemType.FTP.getFileSystemPluginName();
}
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- FtpConfigOptions.FTP_HOST.key(),
- FtpConfigOptions.FTP_PORT.key(),
- FtpConfigOptions.FTP_USERNAME.key(),
- FtpConfigOptions.FTP_PASSWORD.key());
- if (!result.isSuccess()) {
- throw new FileConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- super.prepare(pluginConfig);
- hadoopConf = FtpConf.buildWithConfig(pluginConfig);
+ public FtpFileSink(ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
+ super(FtpConf.buildWithConfig(readonlyConfig), readonlyConfig,
catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
index 24a9ed48f8..cfd2351a5c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
@@ -17,18 +17,27 @@
package org.apache.seatunnel.connectors.seatunnel.file.ftp.sink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import
org.apache.seatunnel.connectors.seatunnel.file.factory.BaseMultipleTableFileSinkFactory;
import
org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfigOptions;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import com.google.auto.service.AutoService;
@AutoService(Factory.class)
-public class FtpFileSinkFactory implements TableSinkFactory {
+public class FtpFileSinkFactory extends BaseMultipleTableFileSinkFactory {
@Override
public String factoryIdentifier() {
return FileSystemType.FTP.getFileSystemPluginName();
@@ -42,7 +51,11 @@ public class FtpFileSinkFactory implements TableSinkFactory {
.required(FtpConfigOptions.FTP_PORT)
.required(FtpConfigOptions.FTP_USERNAME)
.required(FtpConfigOptions.FTP_PASSWORD)
+ .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
+ .optional(BaseSinkConfig.TMP_PATH)
.optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+ .optional(BaseSinkConfig.SCHEMA_SAVE_MODE)
+ .optional(BaseSinkConfig.DATA_SAVE_MODE)
.conditional(
BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
@@ -94,4 +107,12 @@ public class FtpFileSinkFactory implements TableSinkFactory
{
.optional(FtpConfigOptions.FTP_CONNECTION_MODE)
.build();
}
+
+ @Override
+ public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo,
FileAggregatedCommitInfo>
+ createSink(TableSinkFactoryContext context) {
+ ReadonlyConfig readonlyConfig = context.getOptions();
+ CatalogTable catalogTable = context.getCatalogTable();
+ return () -> new FtpFileSink(readonlyConfig, catalogTable);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index b032717cab..d6f0f64abb 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
@@ -78,7 +79,7 @@ public class FtpFileSource extends BaseFileSource {
"Ftp file source connector only support read [text, csv,
json] files");
}
String path = pluginConfig.getString(FtpConfigOptions.FILE_PATH.key());
- hadoopConf = FtpConf.buildWithConfig(pluginConfig);
+ hadoopConf =
FtpConf.buildWithConfig(ReadonlyConfig.fromConfig(pluginConfig));
readStrategy =
ReadStrategyFactory.of(
pluginConfig.getString(FtpConfigOptions.FILE_FORMAT_TYPE.key()));
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 2a1598bf32..1b89a0bcc7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -25,17 +25,27 @@ import org.apache.seatunnel.e2e.common.container.TestHelper;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.apache.commons.lang3.StringUtils;
+
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
+import
org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback;
+import com.github.dockerjava.api.command.ExecCreateCmdResponse;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.stream.Stream;
@DisabledOnContainer(
@@ -143,6 +153,69 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
helper.execute("/excel/fake_source_to_ftp_root_path_excel.conf");
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.FLINK},
+ disabledReason = "Flink dosen't support multi-table at now")
+ public void testMultipleTableAndSaveMode(TestContainer container)
+ throws IOException, InterruptedException {
+ TestHelper helper = new TestHelper(container);
+ // test mult table and save_mode:RECREATE_SCHEMA DROP_DATA
+ String homePath = "/home/vsftpd/seatunnel";
+ String path1 = "/tmp/seatunnel_mult/text/source_1";
+ String path2 = "/tmp/seatunnel_mult/text/source_2";
+ Assertions.assertEquals(getFileListFromContainer(homePath +
path1).size(), 0);
+ Assertions.assertEquals(getFileListFromContainer(homePath +
path2).size(), 0);
+ helper.execute("/text/multiple_table_fake_to_ftp_file_text.conf");
+ Assertions.assertEquals(getFileListFromContainer(homePath +
path1).size(), 1);
+ Assertions.assertEquals(getFileListFromContainer(homePath +
path2).size(), 1);
+ helper.execute("/text/multiple_table_fake_to_ftp_file_text.conf");
+ Assertions.assertEquals(getFileListFromContainer(homePath +
path1).size(), 1);
+ Assertions.assertEquals(getFileListFromContainer(homePath +
path2).size(), 1);
+ // test mult table and save_mode:CREATE_SCHEMA_WHEN_NOT_EXIST
APPEND_DATA
+ String path3 = "/tmp/seatunnel_mult2/text/source_1";
+ String path4 = "/tmp/seatunnel_mult2/text/source_2";
+ Assertions.assertEquals(getFileListFromContainer(homePath +
path3).size(), 0);
+ Assertions.assertEquals(getFileListFromContainer(homePath +
path4).size(), 0);
+ helper.execute("/text/multiple_table_fake_to_ftp_file_text_2.conf");
+ Assertions.assertEquals(getFileListFromContainer(homePath +
path3).size(), 1);
+ Assertions.assertEquals(getFileListFromContainer(homePath +
path4).size(), 1);
+ helper.execute("/text/multiple_table_fake_to_ftp_file_text_2.conf");
+ Assertions.assertEquals(getFileListFromContainer(homePath +
path3).size(), 2);
+ Assertions.assertEquals(getFileListFromContainer(homePath +
path4).size(), 2);
+ }
+
+ @SneakyThrows
+ private List<String> getFileListFromContainer(String path) {
+ String command = "ls -1 " + path;
+ ExecCreateCmdResponse execCreateCmdResponse =
+ dockerClient
+ .execCreateCmd(ftpContainer.getContainerId())
+ .withCmd("sh", "-c", command)
+ .withAttachStdout(true)
+ .withAttachStderr(true)
+ .exec();
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ dockerClient
+ .execStartCmd(execCreateCmdResponse.getId())
+ .exec(new ExecStartResultCallback(outputStream, System.err))
+ .awaitCompletion();
+
+ String output = new String(outputStream.toByteArray(),
StandardCharsets.UTF_8).trim();
+ List<String> fileList = new ArrayList<>();
+ log.info("container path file list is :{}", output);
+ String[] files = output.split("\n");
+ for (String file : files) {
+ if (StringUtils.isNotEmpty(file)) {
+ log.info("container path file name is :{}", file);
+ fileList.add(file);
+ }
+ }
+ return fileList;
+ }
+
@AfterAll
@Override
public void tearDown() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf
new file mode 100644
index 0000000000..cd28e54399
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ result_table_name = "ftp"
+ tables_configs = [
+ {
+ schema = {
+ table = "source_1"
+ fields {
+ id = int
+ val_bool = boolean
+ val_tinyint = tinyint
+ val_smallint = smallint
+ val_int = int
+ val_bigint = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "source_2"
+ fields {
+ id = int
+ val_bool = boolean
+ val_tinyint = tinyint
+ val_smallint = smallint
+ val_int = int
+ val_bigint = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3]
+ }
+ ]
+ }
+ ]
+ }
+}
+
+transform {
+}
+
+sink {
+ FtpFile {
+ host = "ftp"
+ port = 21
+ user = seatunnel
+ password = pass
+ path = "/tmp/seatunnel_mult/text/${table_name}"
+ source_table_name = "ftp"
+ row_delimiter = "\n"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "text"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ compress_codec = "lzo"
+ "schema_save_mode"="RECREATE_SCHEMA"
+ "data_save_mode"="DROP_DATA"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text_2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text_2.conf
new file mode 100644
index 0000000000..e05a14ab86
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text_2.conf
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ result_table_name = "ftp"
+ tables_configs = [
+ {
+ schema = {
+ table = "source_1"
+ fields {
+ id = int
+ val_bool = boolean
+ val_tinyint = tinyint
+ val_smallint = smallint
+ val_int = int
+ val_bigint = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"]
+ }
+ ]
+ },
+ {
+ schema = {
+ table = "source_2"
+ fields {
+ id = int
+ val_bool = boolean
+ val_tinyint = tinyint
+ val_smallint = smallint
+ val_int = int
+ val_bigint = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3]
+ }
+ ]
+ }
+ ]
+ }
+}
+
+transform {
+}
+
+sink {
+ FtpFile {
+ host = "ftp"
+ port = 21
+ user = seatunnel
+ password = pass
+ path = "/tmp/seatunnel_mult2/text/${table_name}"
+ source_table_name = "ftp"
+ row_delimiter = "\n"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "text"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ compress_codec = "lzo"
+ "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+ "data_save_mode"="APPEND_DATA"
+ }
+}
\ No newline at end of file