This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 22fe27a3d6 [Feature][Connector-V2]Ftp file source support multiple
table (#7795)
22fe27a3d6 is described below
commit 22fe27a3d69e92c7697af6f62dedd903c259f12a
Author: 老王 <[email protected]>
AuthorDate: Tue Oct 8 21:18:45 2024 +0800
[Feature][Connector-V2]Ftp file source support multiple table (#7795)
---
docs/en/connector-v2/source/FtpFile.md | 61 +++++++++
.../file/ftp/config/FTPFileSourceConfig.java | 45 +++++++
.../config/MultipleTableFTPFileSourceConfig.java | 34 +++++
.../seatunnel/file/ftp/source/FtpFileSource.java | 115 +----------------
.../file/ftp/source/FtpFileSourceFactory.java | 22 +++-
.../e2e/connector/file/ftp/FtpFileIT.java | 36 +++++-
...ftp_file_json_to_assert_with_multipletable.conf | 140 +++++++++++++++++++++
7 files changed, 334 insertions(+), 119 deletions(-)
diff --git a/docs/en/connector-v2/source/FtpFile.md
b/docs/en/connector-v2/source/FtpFile.md
index 656f7a0042..ec02f77f9f 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -306,6 +306,67 @@ Source plugin common parameters, please refer to [Source
Common Options](../sour
```
+### Multiple Table
+
+```hocon
+
+FtpFile {
+ tables_configs = [
+ {
+ schema {
+ table = "student"
+ }
+ path = "/tmp/seatunnel/sink/text"
+ host = "192.168.31.48"
+ port = 21
+ user = tyrantlucifer
+ password = tianchao
+ file_format_type = "parquet"
+ },
+ {
+ schema {
+ table = "teacher"
+ }
+ path = "/tmp/seatunnel/sink/text"
+ host = "192.168.31.48"
+ port = 21
+ user = tyrantlucifer
+ password = tianchao
+ file_format_type = "parquet"
+ }
+ ]
+}
+
+```
+
+```hocon
+
+FtpFile {
+ tables_configs = [
+ {
+ schema {
+ fields {
+ name = string
+ age = int
+ }
+ }
+ path = "/apps/hive/demo/student"
+ file_format_type = "json"
+ },
+ {
+ schema {
+ fields {
+ name = string
+ age = int
+ }
+ }
+ path = "/apps/hive/demo/teacher"
+ file_format_type = "json"
+ }
+}
+
+```
+
### Transfer Binary File
```hocon
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FTPFileSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FTPFileSourceConfig.java
new file mode 100644
index 0000000000..8677ed29d4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FTPFileSourceConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.ftp.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+import lombok.Getter;
+
+@Getter
+public class FTPFileSourceConfig extends BaseFileSourceConfig {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public HadoopConf getHadoopConfig() {
+ return FtpConf.buildWithConfig(getBaseFileSourceConfig());
+ }
+
+ @Override
+ public String getPluginName() {
+ return FileSystemType.FTP.getFileSystemPluginName();
+ }
+
+ public FTPFileSourceConfig(ReadonlyConfig readonlyConfig) {
+ super(readonlyConfig);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/MultipleTableFTPFileSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/MultipleTableFTPFileSourceConfig.java
new file mode 100644
index 0000000000..78a04c648e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/MultipleTableFTPFileSourceConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.ftp.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;
+
+public class MultipleTableFTPFileSourceConfig extends
BaseMultipleTableFileSourceConfig {
+
+ public MultipleTableFTPFileSourceConfig(ReadonlyConfig
ossFileSourceRootConfig) {
+ super(ossFileSourceRootConfig);
+ }
+
+ @Override
+ public BaseFileSourceConfig getBaseSourceConfig(ReadonlyConfig
readonlyConfig) {
+ return new FTPFileSourceConfig(readonlyConfig);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index d6f0f64abb..b8e798ba0a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -17,121 +17,18 @@
package org.apache.seatunnel.connectors.seatunnel.file.ftp.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfigOptions;
-import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
-
-import com.google.auto.service.AutoService;
+import
org.apache.seatunnel.connectors.seatunnel.file.ftp.config.MultipleTableFTPFileSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
-import java.io.IOException;
+public class FtpFileSource extends BaseMultipleTableFileSource {
+ public FtpFileSource(ReadonlyConfig readonlyConfig) {
+ super(new MultipleTableFTPFileSourceConfig(readonlyConfig));
+ }
-@AutoService(SeaTunnelSource.class)
-public class FtpFileSource extends BaseFileSource {
@Override
public String getPluginName() {
return FileSystemType.FTP.getFileSystemPluginName();
}
-
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- FtpConfigOptions.FILE_PATH.key(),
- FtpConfigOptions.FILE_FORMAT_TYPE.key(),
- FtpConfigOptions.FTP_HOST.key(),
- FtpConfigOptions.FTP_PORT.key(),
- FtpConfigOptions.FTP_USERNAME.key(),
- FtpConfigOptions.FTP_PASSWORD.key());
- if (!result.isSuccess()) {
- throw new FileConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- FileFormat fileFormat =
- FileFormat.valueOf(
- pluginConfig
-
.getString(FtpConfigOptions.FILE_FORMAT_TYPE.key())
- .toUpperCase());
- if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
- throw new FileConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- "Ftp file source connector only support read [text, csv,
json] files");
- }
- String path = pluginConfig.getString(FtpConfigOptions.FILE_PATH.key());
- hadoopConf =
FtpConf.buildWithConfig(ReadonlyConfig.fromConfig(pluginConfig));
- readStrategy =
- ReadStrategyFactory.of(
-
pluginConfig.getString(FtpConfigOptions.FILE_FORMAT_TYPE.key()));
- readStrategy.setPluginConfig(pluginConfig);
- readStrategy.init(hadoopConf);
- try {
- filePaths = readStrategy.getFileNamesByPath(path);
- } catch (IOException e) {
- String errorMsg = String.format("Get file list from this path [%s]
failed", path);
- throw new FileConnectorException(
- FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
- }
- // support user-defined schema
- // only json type support user-defined schema now
- if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
- switch (fileFormat) {
- case CSV:
- case TEXT:
- case JSON:
- case EXCEL:
- case XML:
- SeaTunnelRowType userDefinedSchema =
-
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
- rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
- break;
- case ORC:
- case PARQUET:
- case BINARY:
- throw new FileConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
- "SeaTunnel does not support user-defined schema
for [parquet, orc, binary] files");
- default:
- // never got in there
- throw new FileConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- "SeaTunnel does not supported this file format");
- }
- } else {
- if (filePaths.isEmpty()) {
- // When the directory is empty, distribute default behavior
schema
- rowType = CatalogTableUtil.buildSimpleTextSchema();
- return;
- }
- try {
- rowType =
readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
- } catch (FileConnectorException e) {
- String errorMsg =
- String.format("Get table schema from file [%s]
failed", filePaths.get(0));
- throw new FileConnectorException(
- CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED,
errorMsg, e);
- }
- }
- }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index 112cccc3af..9bb4b98e05 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -19,9 +19,12 @@ package
org.apache.seatunnel.connectors.seatunnel.file.ftp.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
@@ -29,6 +32,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfigOption
import com.google.auto.service.AutoService;
+import java.io.Serializable;
import java.util.Arrays;
@AutoService(Factory.class)
@@ -38,15 +42,21 @@ public class FtpFileSourceFactory implements
TableSourceFactory {
return FileSystemType.FTP.getFileSystemPluginName();
}
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () -> (SeaTunnelSource<T, SplitT, StateT>) new
FtpFileSource(context.getOptions());
+ }
+
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(FtpConfigOptions.FILE_PATH)
- .required(FtpConfigOptions.FTP_HOST)
- .required(FtpConfigOptions.FTP_PORT)
- .required(FtpConfigOptions.FTP_USERNAME)
- .required(FtpConfigOptions.FTP_PASSWORD)
- .required(FtpConfigOptions.FILE_FORMAT_TYPE)
+ .optional(FtpConfigOptions.FILE_PATH)
+ .optional(FtpConfigOptions.FTP_HOST)
+ .optional(FtpConfigOptions.FTP_PORT)
+ .optional(FtpConfigOptions.FTP_USERNAME)
+ .optional(FtpConfigOptions.FTP_PASSWORD)
+ .optional(FtpConfigOptions.FILE_FORMAT_TYPE)
.conditional(
BaseSourceConfigOptions.FILE_FORMAT_TYPE,
FileFormat.TEXT,
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 165d48e631..70b2463ea8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -158,13 +158,19 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
helper.execute("/orc/fake_to_ftp_file_orc.conf");
// test write ftp root path excel file
helper.execute("/excel/fake_source_to_ftp_root_path_excel.conf");
+ // test ftp source support multipleTable
+
+ String homePath = "/home/vsftpd/seatunnel";
+ String sink01 = "/tmp/seatunnel/json/sink/multiplesource/fake01";
+ String sink02 = "/tmp/seatunnel/json/sink/multiplesource/fake02";
+ deleteFileFromContainer(homePath + sink01);
+ deleteFileFromContainer(homePath + sink02);
+
helper.execute("/json/ftp_file_json_to_assert_with_multipletable.conf");
+ Assertions.assertEquals(getFileListFromContainer(homePath +
sink01).size(), 1);
+ Assertions.assertEquals(getFileListFromContainer(homePath +
sink02).size(), 1);
}
@TestTemplate
- @DisabledOnContainer(
- value = {},
- type = {EngineType.FLINK},
- disabledReason = "Flink dosen't support multi-table at now")
public void testMultipleTableAndSaveMode(TestContainer container)
throws IOException, InterruptedException {
TestHelper helper = new TestHelper(container);
@@ -172,6 +178,8 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
String homePath = "/home/vsftpd/seatunnel";
String path1 = "/tmp/seatunnel_mult/text/source_1";
String path2 = "/tmp/seatunnel_mult/text/source_2";
+ deleteFileFromContainer(homePath + path1);
+ deleteFileFromContainer(homePath + path2);
Assertions.assertEquals(getFileListFromContainer(homePath +
path1).size(), 0);
Assertions.assertEquals(getFileListFromContainer(homePath +
path2).size(), 0);
helper.execute("/text/multiple_table_fake_to_ftp_file_text.conf");
@@ -183,6 +191,8 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
// test mult table and save_mode:CREATE_SCHEMA_WHEN_NOT_EXIST
APPEND_DATA
String path3 = "/tmp/seatunnel_mult2/text/source_1";
String path4 = "/tmp/seatunnel_mult2/text/source_2";
+ deleteFileFromContainer(homePath + path3);
+ deleteFileFromContainer(homePath + path4);
Assertions.assertEquals(getFileListFromContainer(homePath +
path3).size(), 0);
Assertions.assertEquals(getFileListFromContainer(homePath +
path4).size(), 0);
helper.execute("/text/multiple_table_fake_to_ftp_file_text_2.conf");
@@ -223,6 +233,24 @@ public class FtpFileIT extends TestSuiteBase implements
TestResource {
return fileList;
}
+ @SneakyThrows
+ private void deleteFileFromContainer(String path) {
+ String command = "rm -rf " + path;
+ ExecCreateCmdResponse execCreateCmdResponse =
+ dockerClient
+ .execCreateCmd(ftpContainer.getContainerId())
+ .withCmd("sh", "-c", command)
+ .withAttachStdout(true)
+ .withAttachStderr(true)
+ .exec();
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ dockerClient
+ .execStartCmd(execCreateCmdResponse.getId())
+ .exec(new ExecStartResultCallback(outputStream, System.err))
+ .awaitCompletion();
+ }
+
@AfterAll
@Override
public void tearDown() {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_file_json_to_assert_with_multipletable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_file_json_to_assert_with_multipletable.conf
new file mode 100644
index 0000000000..8fd613255c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_file_json_to_assert_with_multipletable.conf
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FtpFile {
+ tables_configs = [
+ {
+ host = "ftp"
+ port = 21
+ user = seatunnel
+ password = pass
+ path = "/tmp/seatunnel/read/json"
+ file_format_type = "json"
+ schema = {
+ table = "fake01"
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ C_MAP = "map<string, string>"
+ C_ARRAY = "array<int>"
+ C_STRING = string
+ C_BOOLEAN = boolean
+ C_TINYINT = tinyint
+ C_SMALLINT = smallint
+ C_INT = int
+ C_BIGINT = bigint
+ C_FLOAT = float
+ C_DOUBLE = double
+ C_BYTES = bytes
+ C_DATE = date
+ C_DECIMAL = "decimal(38, 18)"
+ C_TIMESTAMP = timestamp
+ }
+ }
+ }
+ },
+ {
+ host = "ftp"
+ port = 21
+ user = seatunnel
+ password = pass
+ path = "/tmp/seatunnel/read/json"
+ file_format_type = "json"
+ schema = {
+ table = "fake02"
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ C_MAP = "map<string, string>"
+ C_ARRAY = "array<int>"
+ C_STRING = string
+ C_BOOLEAN = boolean
+ C_TINYINT = tinyint
+ C_SMALLINT = smallint
+ C_INT = int
+ C_BIGINT = bigint
+ C_FLOAT = float
+ C_DOUBLE = double
+ C_BYTES = bytes
+ C_DATE = date
+ C_DECIMAL = "decimal(38, 18)"
+ C_TIMESTAMP = timestamp
+ }
+ }
+ }
+ }
+ ]
+ result_table_name = "ftp"
+ }
+}
+
+sink {
+ FtpFile {
+ host = "ftp"
+ port = 21
+ user = seatunnel
+ password = pass
+ path = "/tmp/seatunnel/json/sink/multiplesource/${table_name}"
+ source_table_name = "ftp"
+ row_delimiter = "\n"
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "json"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ }
+}
\ No newline at end of file