This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 72be6663ad [Feature] LocalFileSource support multiple table
72be6663ad is described below
commit 72be6663ad7e2252d683e365df5462114eff2cd3
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Nov 13 13:43:50 2023 +0800
[Feature] LocalFileSource support multiple table
---
docs/en/connector-v2/source/LocalFile.md | 92 +++++++++--
.../file/exception/FileConnectorErrorCode.java | 7 +-
.../file/source/BaseFileSourceReader.java | 4 +-
.../file/source/reader/ExcelReadStrategy.java | 3 +-
.../file/source/reader/JsonReadStrategy.java | 3 +-
.../file/source/reader/OrcReadStrategy.java | 3 +-
.../file/source/reader/ParquetReadStrategy.java | 3 +-
.../seatunnel/file/source/reader/ReadStrategy.java | 5 +-
.../file/source/reader/ReadStrategyFactory.java | 11 ++
.../file/source/reader/TextReadStrategy.java | 3 +-
.../seatunnel/file/writer/OrcReadStrategyTest.java | 4 +-
.../file/writer/ParquetReadStrategyTest.java | 12 +-
.../{LocalConf.java => LocalFileHadoopConf.java} | 8 +-
.../seatunnel/file/local/sink/LocalFileSink.java | 6 +-
.../file/local/source/LocalFileSource.java | 149 +++++++-----------
.../file/local/source/LocalFileSourceFactory.java | 17 +-
.../local/source/config/LocalFileSourceConfig.java | 171 +++++++++++++++++++++
...urceConfig.java => LocalFileSourceOptions.java} | 17 +-
.../config/MultipleTableLocalFileSourceConfig.java | 58 +++++++
.../reader/MultipleTableLocalFileSourceReader.java | 130 ++++++++++++++++
.../LocalFileSourceSplit.java} | 24 ++-
...ultipleTableLocalFileSourceSplitEnumerator.java | 161 +++++++++++++++++++
.../LocalFileSourceState.java} | 22 ++-
.../file/local/LocalFileWithMultipleTableIT.java | 103 +++++++++++++
.../local_excel_to_assert_with_multipletable.conf | 124 +++++++++++++++
...cal_file_json_to_assert_with_multipletable.conf | 120 +++++++++++++++
.../test/resources/json/local_file_to_console.conf | 4 +
...cal_file_orc_to_assert_with_multipletable.conf} | 25 ++-
...file_parquet_to_assert_with_multipletable.conf} | 25 ++-
...cal_file_text_to_assert_with_multipletable.conf | 120 +++++++++++++++
.../resources/write-cdc-changelog-to-kudu.conf | 86 +++++------
31 files changed, 1329 insertions(+), 191 deletions(-)
diff --git a/docs/en/connector-v2/source/LocalFile.md
b/docs/en/connector-v2/source/LocalFile.md
index f562fd30ae..4d20ca532d 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -41,22 +41,23 @@ If you use SeaTunnel Engine, It automatically integrated
the hadoop jar when you
## Options
-| name | type | required | default value |
-|---------------------------|---------|----------|---------------------|
-| path | string | yes | - |
-| file_format_type | string | yes | - |
-| read_columns | list | no | - |
-| delimiter/field_delimiter | string | no | \001 |
-| parse_partition_from_path | boolean | no | true |
-| date_format | string | no | yyyy-MM-dd |
-| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
-| time_format | string | no | HH:mm:ss |
-| skip_header_row_number | long | no | 0 |
-| schema | config | no | - |
-| sheet_name | string | no | - |
-| file_filter_pattern | string | no | - |
-| compress_codec | string | no | none |
-| common-options | | no | - |
+| name | type | required | default value
|
+|---------------------------|---------|----------|--------------------------------------|
+| path | string | yes | -
|
+| file_format_type | string | yes | -
|
+| read_columns | list | no | -
|
+| delimiter/field_delimiter | string | no | \001
|
+| parse_partition_from_path | boolean | no | true
|
+| date_format | string | no | yyyy-MM-dd
|
+| datetime_format | string | no | yyyy-MM-dd HH:mm:ss
|
+| time_format | string | no | HH:mm:ss
|
+| skip_header_row_number | long | no | 0
|
+| schema | config | no | -
|
+| sheet_name | string | no | -
|
+| file_filter_pattern | string | no | -
|
+| compress_codec | string | no | none
|
+| common-options | | no | -
|
+| tables_configs | list | no | used to define a multiple
table task |
### path [string]
@@ -244,8 +245,14 @@ The compress codec of files and the details that supported
as the following show
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
+### tables_configs
+
+Used to define a multiple table task, when you have multiple tables to read,
you can use this option to define multiple tables.
+
## Example
+### One Table
+
```hocon
LocalFile {
@@ -270,6 +277,59 @@ LocalFile {
```
+### Multiple Table
+
+```hocon
+
+LocalFile {
+ tables_configs = [
+ {
+ schema {
+ table = "student"
+ }
+ path = "/apps/hive/demo/student"
+ file_format_type = "parquet"
+ },
+ {
+ schema {
+ table = "teacher"
+ }
+ path = "/apps/hive/demo/teacher"
+ file_format_type = "parquet"
+ }
+ ]
+}
+
+```
+
+```hocon
+
+LocalFile {
+ tables_configs = [
+ {
+ schema {
+ fields {
+ name = string
+ age = int
+ }
+ }
+ path = "/apps/hive/demo/student"
+ file_format_type = "json"
+ },
+ {
+ schema {
+ fields {
+ name = string
+ age = int
+ }
+ }
+ path = "/apps/hive/demo/teacher"
+ file_format_type = "json"
+ }
+}
+
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
index 65e9590f34..27dca4a6bc 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
@@ -23,7 +23,12 @@ public enum FileConnectorErrorCode implements
SeaTunnelErrorCode {
FILE_TYPE_INVALID("FILE-01", "File type is invalid"),
DATA_DESERIALIZE_FAILED("FILE-02", "Data deserialization failed"),
FILE_LIST_GET_FAILED("FILE-03", "Get file list failed"),
- FILE_LIST_EMPTY("FILE-04", "File list is empty");
+ FILE_LIST_EMPTY("FILE-04", "File list is empty"),
+ AGGREGATE_COMMIT_ERROR("FILE-05", "Aggregate committer error"),
+ FILE_READ_STRATEGY_NOT_SUPPORT("FILE-06", "File strategy not support"),
+ FORMAT_NOT_SUPPORT("FILE-07", "Format not support"),
+ FILE_READ_FAILED("FILE-08", "File read failed"),
+ ;
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
index 18332b9436..7082b6171e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
@@ -63,7 +63,9 @@ public class BaseFileSourceReader implements
SourceReader<SeaTunnelRow, FileSour
FileSourceSplit split = sourceSplits.poll();
if (null != split) {
try {
- readStrategy.read(split.splitId(), output);
+ // todo: If there is only one table , the tableId is not
needed, but it's better
+ // to set this
+ readStrategy.read(split.splitId(), "", output);
} catch (Exception e) {
String errorMsg =
String.format("Read data from this file [%s]
failed", split.splitId());
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
index 02f6c30772..ba7865b315 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
@@ -74,7 +74,7 @@ public class ExcelReadStrategy extends AbstractReadStrategy {
@SneakyThrows
@Override
- public void read(String path, Collector<SeaTunnelRow> output) {
+ public void read(String path, String tableId, Collector<SeaTunnelRow>
output) {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
Map<String, String> partitionsMap = parsePartitionsByPath(path);
@@ -124,6 +124,7 @@ public class ExcelReadStrategy extends AbstractReadStrategy
{
seaTunnelRow.setField(index++, value);
}
}
+ seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
});
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
index 9095f97236..d8dccd86de 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
@@ -69,7 +69,7 @@ public class JsonReadStrategy extends AbstractReadStrategy {
}
@Override
- public void read(String path, Collector<SeaTunnelRow> output)
+ public void read(String path, String tableId, Collector<SeaTunnelRow>
output)
throws FileConnectorException, IOException {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
@@ -105,6 +105,7 @@ public class JsonReadStrategy extends AbstractReadStrategy {
seaTunnelRow.setField(index++,
value);
}
}
+ seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
} catch (IOException e) {
String errorMsg =
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index 4bdbac1e96..56c782e3d0 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -74,7 +74,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
private static final long MIN_SIZE = 16 * 1024;
@Override
- public void read(String path, Collector<SeaTunnelRow> output)
+ public void read(String path, String tableId, Collector<SeaTunnelRow>
output)
throws FileConnectorException, IOException {
if (Boolean.FALSE.equals(checkFileType(path))) {
String errorMsg =
@@ -120,6 +120,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
}
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
+ seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
num++;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 944d3ee86a..e7a0c0af4a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -81,7 +81,7 @@ public class ParquetReadStrategy extends AbstractReadStrategy
{
private int[] indexes;
@Override
- public void read(String path, Collector<SeaTunnelRow> output)
+ public void read(String path, String tableId, Collector<SeaTunnelRow>
output)
throws FileConnectorException, IOException {
if (Boolean.FALSE.equals(checkFileType(path))) {
String errorMsg =
@@ -119,6 +119,7 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
fields[i] = resolveObject(data,
seaTunnelRowType.getFieldType(i));
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
+ seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index b53e97140e..3f1a869b18 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -36,17 +36,20 @@ public interface ReadStrategy extends Serializable {
Configuration getConfiguration(HadoopConf conf);
- void read(String path, Collector<SeaTunnelRow> output)
+ void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws IOException, FileConnectorException;
SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String
path)
throws FileConnectorException;
+ // todo: use CatalogTable
void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws
IOException;
+ // todo: use ReadonlyConfig
void setPluginConfig(Config pluginConfig);
+ // todo: use CatalogTable
SeaTunnelRowType getActualSeaTunnelRowTypeInfo();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
index 3aa1874edf..777490c031 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
@@ -17,8 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import lombok.extern.slf4j.Slf4j;
@@ -28,6 +31,14 @@ public class ReadStrategyFactory {
private ReadStrategyFactory() {}
+ public static ReadStrategy of(ReadonlyConfig readonlyConfig, HadoopConf
hadoopConf) {
+ ReadStrategy readStrategy =
+
of(readonlyConfig.get(BaseSourceConfig.FILE_FORMAT_TYPE).name());
+ readStrategy.setPluginConfig(readonlyConfig.toConfig());
+ readStrategy.init(hadoopConf);
+ return readStrategy;
+ }
+
public static ReadStrategy of(String fileType) {
try {
FileFormat fileFormat = FileFormat.valueOf(fileType.toUpperCase());
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 816e50b57b..586d165e1b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -64,7 +64,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
private int[] indexes;
@Override
- public void read(String path, Collector<SeaTunnelRow> output)
+ public void read(String path, String tableId, Collector<SeaTunnelRow>
output)
throws FileConnectorException, IOException {
Configuration conf = getConfiguration();
FileSystem fs = FileSystem.get(conf);
@@ -118,6 +118,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
seaTunnelRow.setField(index++,
value);
}
}
+ seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
} catch (IOException e) {
String errorMsg =
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
index 5e8eb9a2c8..56fbaae386 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
@@ -52,7 +52,7 @@ public class OrcReadStrategyTest {
orcReadStrategy.getSeaTunnelRowTypeInfo(localConf,
orcFilePath);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
- orcReadStrategy.read(orcFilePath, testCollector);
+ orcReadStrategy.read(orcFilePath, "", testCollector);
for (SeaTunnelRow row : testCollector.getRows()) {
Assertions.assertEquals(row.getField(0).getClass(), Boolean.class);
Assertions.assertEquals(row.getField(1).getClass(), Byte.class);
@@ -78,7 +78,7 @@ public class OrcReadStrategyTest {
orcReadStrategy.getSeaTunnelRowTypeInfo(localConf,
orcFilePath);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
- orcReadStrategy.read(orcFilePath, testCollector);
+ orcReadStrategy.read(orcFilePath, "", testCollector);
for (SeaTunnelRow row : testCollector.getRows()) {
Assertions.assertEquals(row.getField(0).getClass(), Byte.class);
Assertions.assertEquals(row.getField(1).getClass(), Boolean.class);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index 0a3bee6282..1c36a91453 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -53,7 +53,7 @@ public class ParquetReadStrategyTest {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
- parquetReadStrategy.read(path, testCollector);
+ parquetReadStrategy.read(path, "", testCollector);
}
@Test
@@ -69,7 +69,7 @@ public class ParquetReadStrategyTest {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
- parquetReadStrategy.read(path, testCollector);
+ parquetReadStrategy.read(path, "", testCollector);
}
@Test
@@ -88,13 +88,13 @@ public class ParquetReadStrategyTest {
TimeZone tz1 = TimeZone.getTimeZone("Asia/Shanghai");
TimeZone.setDefault(tz1);
TestCollector testCollector = new TestCollector();
- parquetReadStrategy.read(path, testCollector);
+ parquetReadStrategy.read(path, "", testCollector);
LocalDateTime time1 = (LocalDateTime)
testCollector.getRows().get(0).getField(index);
TimeZone tz2 = TimeZone.getTimeZone("UTC");
TimeZone.setDefault(tz2);
TestCollector testCollector2 = new TestCollector();
- parquetReadStrategy.read(path, testCollector2);
+ parquetReadStrategy.read(path, "", testCollector2);
LocalDateTime time2 = (LocalDateTime)
testCollector2.getRows().get(0).getField(index);
Assertions.assertTrue(time1.isAfter(time2));
@@ -121,7 +121,7 @@ public class ParquetReadStrategyTest {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
- parquetReadStrategy.read(path, testCollector);
+ parquetReadStrategy.read(path, "", testCollector);
List<SeaTunnelRow> rows = testCollector.getRows();
for (SeaTunnelRow row : rows) {
Assertions.assertEquals(row.getField(0).getClass(), Long.class);
@@ -151,7 +151,7 @@ public class ParquetReadStrategyTest {
Assertions.assertNotNull(seaTunnelRowTypeInfo);
System.out.println(seaTunnelRowTypeInfo);
TestCollector testCollector = new TestCollector();
- parquetReadStrategy.read(path, testCollector);
+ parquetReadStrategy.read(path, "", testCollector);
}
public static class TestCollector implements Collector<SeaTunnelRow> {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalFileHadoopConf.java
similarity index 85%
rename from
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java
rename to
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalFileHadoopConf.java
index b1f3f35828..284167ce7d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalFileHadoopConf.java
@@ -19,12 +19,14 @@ package
org.apache.seatunnel.connectors.seatunnel.file.local.config;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-public class LocalConf extends HadoopConf {
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+
+public class LocalFileHadoopConf extends HadoopConf {
private static final String HDFS_IMPL =
"org.apache.hadoop.fs.LocalFileSystem";
private static final String SCHEMA = "file";
- public LocalConf(String hdfsNameKey) {
- super(hdfsNameKey);
+ public LocalFileHadoopConf() {
+ super(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
index a11b8067a7..4d8037ef5f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
@@ -22,11 +22,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-
import com.google.auto.service.AutoService;
@AutoService(SeaTunnelSink.class)
@@ -40,6 +38,6 @@ public class LocalFileSink extends BaseFileSink {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
super.prepare(pluginConfig);
- hadoopConf = new
LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT);
+ hadoopConf = new LocalFileHadoopConf();
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index 8ff31155b8..76ee4e452e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -17,35 +17,37 @@
package org.apache.seatunnel.connectors.seatunnel.file.local.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalSourceConfig;
-import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalFileSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.reader.MultipleTableLocalFileSourceReader;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.MultipleTableLocalFileSourceSplitEnumerator;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.state.LocalFileSourceState;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
+import java.util.List;
+import java.util.stream.Collectors;
-import com.google.auto.service.AutoService;
+public class LocalFileSource
+ implements SeaTunnelSource<SeaTunnelRow, LocalFileSourceSplit,
LocalFileSourceState>,
+ SupportParallelism,
+ SupportColumnProjection {
-import java.io.IOException;
+ private final MultipleTableLocalFileSourceConfig
multipleTableLocalFileSourceConfig;
-@AutoService(SeaTunnelSource.class)
-public class LocalFileSource extends BaseFileSource {
+ public LocalFileSource(ReadonlyConfig readonlyConfig) {
+ this.multipleTableLocalFileSourceConfig =
+ new MultipleTableLocalFileSourceConfig(readonlyConfig);
+ }
@Override
public String getPluginName() {
@@ -53,75 +55,36 @@ public class LocalFileSource extends BaseFileSource {
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- LocalSourceConfig.FILE_PATH.key(),
- LocalSourceConfig.FILE_FORMAT_TYPE.key());
- if (!result.isSuccess()) {
- throw new FileConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
result.getMsg()));
- }
- readStrategy =
- ReadStrategyFactory.of(
-
pluginConfig.getString(LocalSourceConfig.FILE_FORMAT_TYPE.key()));
- readStrategy.setPluginConfig(pluginConfig);
- String path =
pluginConfig.getString(LocalSourceConfig.FILE_PATH.key());
- hadoopConf = new
LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT);
- try {
- filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
- } catch (IOException e) {
- String errorMsg = String.format("Get file list from this path [%s]
failed", path);
- throw new FileConnectorException(
- FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
- }
- // support user-defined schema
- FileFormat fileFormat =
- FileFormat.valueOf(
- pluginConfig
-
.getString(LocalSourceConfig.FILE_FORMAT_TYPE.key())
- .toUpperCase());
- // only json text csv type support user-defined schema now
- if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
- switch (fileFormat) {
- case CSV:
- case TEXT:
- case JSON:
- case EXCEL:
- SeaTunnelRowType userDefinedSchema =
-
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
- rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
- break;
- case ORC:
- case PARQUET:
- throw new FileConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
- "SeaTunnel does not support user-defined schema
for [parquet, orc] files");
- default:
- // never got in there
- throw new FileConnectorException(
- CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
- "SeaTunnel does not supported this file format");
- }
- } else {
- if (filePaths.isEmpty()) {
- // When the directory is empty, distribute default behavior
schema
- rowType = CatalogTableUtil.buildSimpleTextSchema();
- return;
- }
- try {
- rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf,
filePaths.get(0));
- } catch (FileConnectorException e) {
- String errorMsg =
- String.format("Get table schema from file [%s]
failed", filePaths.get(0));
- throw new FileConnectorException(
- CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED,
errorMsg, e);
- }
- }
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public List<CatalogTable> getProducedCatalogTables() {
+ return
multipleTableLocalFileSourceConfig.getLocalFileSourceConfigs().stream()
+ .map(LocalFileSourceConfig::getCatalogTable)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, LocalFileSourceSplit> createReader(
+ SourceReader.Context readerContext) {
+ return new MultipleTableLocalFileSourceReader(
+ readerContext, multipleTableLocalFileSourceConfig);
+ }
+
+ @Override
+ public SourceSplitEnumerator<LocalFileSourceSplit, LocalFileSourceState>
createEnumerator(
+ SourceSplitEnumerator.Context<LocalFileSourceSplit>
enumeratorContext) {
+ return new MultipleTableLocalFileSourceSplitEnumerator(
+ enumeratorContext, multipleTableLocalFileSourceConfig);
+ }
+
+ @Override
+ public SourceSplitEnumerator<LocalFileSourceSplit, LocalFileSourceState>
restoreEnumerator(
+ SourceSplitEnumerator.Context<LocalFileSourceSplit>
enumeratorContext,
+ LocalFileSourceState checkpointState) {
+ return new MultipleTableLocalFileSourceSplitEnumerator(
+ enumeratorContext, multipleTableLocalFileSourceConfig,
checkpointState);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index cf34c99a5b..0e1b8163f8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -19,16 +19,20 @@ package
org.apache.seatunnel.connectors.seatunnel.file.local.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalFileSourceOptions;
import com.google.auto.service.AutoService;
+import java.io.Serializable;
import java.util.Arrays;
@AutoService(Factory.class)
@@ -38,11 +42,18 @@ public class LocalFileSourceFactory implements
TableSourceFactory {
return FileSystemType.LOCAL.getFileSystemPluginName();
}
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () -> (SeaTunnelSource<T, SplitT, StateT>) new
LocalFileSource(context.getOptions());
+ }
+
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(LocalSourceConfig.FILE_PATH)
- .required(BaseSourceConfig.FILE_FORMAT_TYPE)
+ .optional(LocalFileSourceOptions.tables_configs)
+ .optional(BaseSourceConfig.FILE_PATH)
+ .optional(BaseSourceConfig.FILE_FORMAT_TYPE)
.conditional(
BaseSourceConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java
new file mode 100644
index 0000000000..f2fd7b8808
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Getter
+public class LocalFileSourceConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final CatalogTable catalogTable;
+ private final FileFormat fileFormat;
+ private final ReadStrategy readStrategy;
+ private final List<String> filePaths;
+ private final LocalFileHadoopConf localFileHadoopConf;
+
+ public LocalFileSourceConfig(ReadonlyConfig readonlyConfig) {
+ validateConfig(readonlyConfig);
+ this.fileFormat =
readonlyConfig.get(LocalFileSourceOptions.FILE_FORMAT_TYPE);
+ this.localFileHadoopConf = new LocalFileHadoopConf();
+ this.readStrategy = ReadStrategyFactory.of(readonlyConfig,
localFileHadoopConf);
+ this.filePaths = parseFilePaths(readonlyConfig);
+ this.catalogTable = parseCatalogTable(readonlyConfig);
+ }
+
+ private void validateConfig(ReadonlyConfig readonlyConfig) {
+ if
(!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_PATH).isPresent()) {
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ FileSystemType.LOCAL.getFileSystemPluginName(),
+ PluginType.SOURCE,
+ LocalFileSourceOptions.FILE_PATH + " is
required"));
+ }
+ if
(!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_FORMAT_TYPE).isPresent())
{
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ FileSystemType.LOCAL.getFileSystemPluginName(),
+ PluginType.SOURCE,
+ LocalFileSourceOptions.FILE_FORMAT_TYPE.key() + "
is required"));
+ }
+ }
+
+ private List<String> parseFilePaths(ReadonlyConfig readonlyConfig) {
+ String rootPath = null;
+ try {
+ rootPath = readonlyConfig.get(LocalFileSourceOptions.FILE_PATH);
+ return readStrategy.getFileNamesByPath(localFileHadoopConf,
rootPath);
+ } catch (Exception ex) {
+ String errorMsg = String.format("Get file list from this path [%s]
failed", rootPath);
+ throw new FileConnectorException(
+ FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, ex);
+ }
+ }
+
+ private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) {
+ final CatalogTable catalogTable;
+ if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent())
{
+ catalogTable =
+ CatalogTableUtil.buildWithConfig(
+ FileSystemType.LOCAL.getFileSystemPluginName(),
readonlyConfig);
+ } else {
+ catalogTable = CatalogTableUtil.buildSimpleTextTable();
+ }
+ if (CollectionUtils.isEmpty(filePaths)) {
+ return catalogTable;
+ }
+ switch (fileFormat) {
+ case CSV:
+ case TEXT:
+ case JSON:
+ case EXCEL:
+
readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+ return newCatalogTable(catalogTable,
readStrategy.getActualSeaTunnelRowTypeInfo());
+ case ORC:
+ case PARQUET:
+ return newCatalogTable(
+ catalogTable,
+ readStrategy.getSeaTunnelRowTypeInfo(
+ localFileHadoopConf, filePaths.get(0)));
+ default:
+ throw new FileConnectorException(
+ FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
+ "SeaTunnel does not supported this file format: [" +
fileFormat + "]");
+ }
+ }
+
+ private CatalogTable newCatalogTable(
+ CatalogTable catalogTable, SeaTunnelRowType seaTunnelRowType) {
+ TableSchema tableSchema = catalogTable.getTableSchema();
+
+ Map<String, Column> columnMap =
+ tableSchema.getColumns().stream()
+ .collect(Collectors.toMap(Column::getName,
Function.identity()));
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+
+ List<Column> finalColumns = new ArrayList<>();
+ for (int i = 0; i < fieldNames.length; i++) {
+ Column column = columnMap.get(fieldNames[i]);
+ if (column != null) {
+ finalColumns.add(column);
+ } else {
+ finalColumns.add(
+ PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0,
false, null, null));
+ }
+ }
+
+ TableSchema finalSchema =
+ TableSchema.builder()
+ .columns(finalColumns)
+ .primaryKey(tableSchema.getPrimaryKey())
+ .constraintKey(tableSchema.getConstraintKeys())
+ .build();
+
+ return CatalogTable.of(
+ catalogTable.getTableId(),
+ finalSchema,
+ catalogTable.getOptions(),
+ catalogTable.getPartitionKeys(),
+ catalogTable.getComment(),
+ catalogTable.getCatalogName());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceOptions.java
similarity index 58%
copy from
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
copy to
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceOptions.java
index 2f43ff8d9f..6cd689fcdc 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceOptions.java
@@ -17,6 +17,21 @@
package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
-public class LocalSourceConfig extends BaseSourceConfig {}
+import java.util.List;
+import java.util.Map;
+
+public final class LocalFileSourceOptions extends BaseSourceConfig {
+
+ public static final Option<List<Map<String, Object>>> tables_configs =
+ Options.key("tables_configs")
+ .type(new TypeReference<List<Map<String, Object>>>() {})
+ .noDefaultValue()
+ .withDescription(
+ "Local file source configs, used to create
multiple local file source.");
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/MultipleTableLocalFileSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/MultipleTableLocalFileSourceConfig.java
new file mode 100644
index 0000000000..0abbaa511e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/MultipleTableLocalFileSourceConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import com.google.common.collect.Lists;
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MultipleTableLocalFileSourceConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Getter private List<LocalFileSourceConfig> localFileSourceConfigs;
+
+ public MultipleTableLocalFileSourceConfig(ReadonlyConfig
localFileSourceRootConfig) {
+ if (localFileSourceRootConfig
+ .getOptional(LocalFileSourceOptions.tables_configs)
+ .isPresent()) {
+ parseFromLocalFileSourceConfigs(localFileSourceRootConfig);
+ } else {
+ parseFromLocalFileSourceConfig(localFileSourceRootConfig);
+ }
+ }
+
+ private void parseFromLocalFileSourceConfigs(ReadonlyConfig
localFileSourceRootConfig) {
+ this.localFileSourceConfigs =
+
localFileSourceRootConfig.get(LocalFileSourceOptions.tables_configs).stream()
+ .map(ReadonlyConfig::fromMap)
+ .map(LocalFileSourceConfig::new)
+ .collect(Collectors.toList());
+ }
+
+ private void parseFromLocalFileSourceConfig(ReadonlyConfig
localFileSourceRootConfig) {
+ LocalFileSourceConfig localFileSourceConfig =
+ new LocalFileSourceConfig(localFileSourceRootConfig);
+ this.localFileSourceConfigs =
Lists.newArrayList(localFileSourceConfig);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/reader/MultipleTableLocalFileSourceReader.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/reader/MultipleTableLocalFileSourceReader.java
new file mode 100644
index 0000000000..bd990db50f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/reader/MultipleTableLocalFileSourceReader.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalFileSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSourceSplit;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.Collectors;
+
+import static
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode.FILE_READ_FAILED;
+import static
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode.FILE_READ_STRATEGY_NOT_SUPPORT;
+
+@Slf4j
+public class MultipleTableLocalFileSourceReader
+ implements SourceReader<SeaTunnelRow, LocalFileSourceSplit> {
+
+ private final SourceReader.Context context;
+ private volatile boolean noMoreSplit;
+
+ private final Deque<LocalFileSourceSplit> sourceSplits = new
ConcurrentLinkedDeque<>();
+
+ private final Map<String, ReadStrategy> readStrategyMap;
+
+ public MultipleTableLocalFileSourceReader(
+ SourceReader.Context context,
+ MultipleTableLocalFileSourceConfig
multipleTableLocalFileSourceConfig) {
+ this.context = context;
+ this.readStrategyMap =
+
multipleTableLocalFileSourceConfig.getLocalFileSourceConfigs().stream()
+ .collect(
+ Collectors.toMap(
+ localFileSourceConfig ->
+ localFileSourceConfig
+ .getCatalogTable()
+ .getTableId()
+ .toTablePath()
+ .toString(),
+
LocalFileSourceConfig::getReadStrategy));
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) {
+ synchronized (output.getCheckpointLock()) {
+ LocalFileSourceSplit split = sourceSplits.poll();
+ if (null != split) {
+ ReadStrategy readStrategy =
readStrategyMap.get(split.getTableId());
+ if (readStrategy == null) {
+ throw new FileConnectorException(
+ FILE_READ_STRATEGY_NOT_SUPPORT,
+ "Cannot found the read strategy for this table: ["
+ + split.getTableId()
+ + "]");
+ }
+ try {
+ readStrategy.read(split.getFilePath(), split.getTableId(),
output);
+ } catch (Exception e) {
+ String errorMsg =
+ String.format("Read data from this file [%s]
failed", split.splitId());
+ throw new FileConnectorException(FILE_READ_FAILED,
errorMsg, e);
+ }
+ } else if (noMoreSplit && sourceSplits.isEmpty()) {
+ // signal to the source that we have reached the end of the
data.
+ log.info(
+ "There is no more element for the bounded
MultipleTableLocalFileSourceReader");
+ context.signalNoMoreElement();
+ }
+ }
+ }
+
+ @Override
+ public List<LocalFileSourceSplit> snapshotState(long checkpointId) {
+ return new ArrayList<>(sourceSplits);
+ }
+
+ @Override
+ public void addSplits(List<LocalFileSourceSplit> splits) {
+ sourceSplits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ noMoreSplit = true;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // do nothing
+ }
+
+ @Override
+ public void open() throws Exception {
+ // do nothing
+ log.info("Opened the MultipleTableLocalFileSourceReader");
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ log.info("Closed the MultipleTableLocalFileSourceReader");
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSourceSplit.java
similarity index 62%
copy from
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
copy to
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSourceSplit.java
index 2f43ff8d9f..89bab1bee4 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSourceSplit.java
@@ -15,8 +15,26 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.split;
-import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
+import org.apache.seatunnel.api.source.SourceSplit;
-public class LocalSourceConfig extends BaseSourceConfig {}
+import lombok.Getter;
+
+public class LocalFileSourceSplit implements SourceSplit {
+
+ private static final long serialVersionUID = 1L;
+
+ @Getter private final String tableId;
+ @Getter private final String filePath;
+
+ public LocalFileSourceSplit(String tableId, String filePath) {
+ this.tableId = tableId;
+ this.filePath = filePath;
+ }
+
+ @Override
+ public String splitId() {
+ return tableId + "_" + filePath;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/MultipleTableLocalFileSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/MultipleTableLocalFileSourceSplitEnumerator.java
new file mode 100644
index 0000000000..f00885f496
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/MultipleTableLocalFileSourceSplitEnumerator.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.split;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalFileSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.state.LocalFileSourceState;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class MultipleTableLocalFileSourceSplitEnumerator
+ implements SourceSplitEnumerator<LocalFileSourceSplit,
LocalFileSourceState> {
+
+ private final SourceSplitEnumerator.Context<LocalFileSourceSplit> context;
+ private final Set<LocalFileSourceSplit> pendingSplit;
+ private final Set<LocalFileSourceSplit> assignedSplit;
+ private final Map<String, List<String>> filePathMap;
+
+ public MultipleTableLocalFileSourceSplitEnumerator(
+ SourceSplitEnumerator.Context<LocalFileSourceSplit> context,
+ MultipleTableLocalFileSourceConfig
multipleTableLocalFileSourceConfig) {
+ this.context = context;
+ this.filePathMap =
+
multipleTableLocalFileSourceConfig.getLocalFileSourceConfigs().stream()
+ .collect(
+ Collectors.toMap(
+ localFileSourceConfig ->
+ localFileSourceConfig
+ .getCatalogTable()
+ .getTableId()
+ .toTablePath()
+ .toString(),
+ LocalFileSourceConfig::getFilePaths));
+ this.assignedSplit = new HashSet<>();
+ this.pendingSplit = new HashSet<>();
+ }
+
+ public MultipleTableLocalFileSourceSplitEnumerator(
+ SourceSplitEnumerator.Context<LocalFileSourceSplit> context,
+ MultipleTableLocalFileSourceConfig
multipleTableLocalFileSourceConfig,
+ LocalFileSourceState localFileSourceState) {
+ this(context, multipleTableLocalFileSourceConfig);
+ this.assignedSplit.addAll(localFileSourceState.getAssignedSplit());
+ }
+
+ @Override
+ public void addSplitsBack(List<LocalFileSourceSplit> splits, int
subtaskId) {
+ if (CollectionUtils.isEmpty(splits)) {
+ return;
+ }
+ pendingSplit.addAll(splits);
+ assignSplit(subtaskId);
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return pendingSplit.size();
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {}
+
+ @Override
+ public void registerReader(int subtaskId) {
+ for (Map.Entry<String, List<String>> filePathEntry :
filePathMap.entrySet()) {
+ String tableId = filePathEntry.getKey();
+ List<String> filePaths = filePathEntry.getValue();
+ for (String filePath : filePaths) {
+ pendingSplit.add(new LocalFileSourceSplit(tableId, filePath));
+ }
+ }
+ assignSplit(subtaskId);
+ }
+
+ @Override
+ public LocalFileSourceState snapshotState(long checkpointId) {
+ return new LocalFileSourceState(assignedSplit);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // do nothing.
+ }
+
+ private void assignSplit(int taskId) {
+ List<LocalFileSourceSplit> currentTaskSplits = new ArrayList<>();
+ if (context.currentParallelism() == 1) {
+ // if parallelism == 1, we should assign all the splits to reader
+ currentTaskSplits.addAll(pendingSplit);
+ } else {
+ // if parallelism > 1, according to hashCode of split's id to
determine whether to
+ // allocate the current task
+ for (LocalFileSourceSplit fileSourceSplit : pendingSplit) {
+ int splitOwner =
+ getSplitOwner(fileSourceSplit.splitId(),
context.currentParallelism());
+ if (splitOwner == taskId) {
+ currentTaskSplits.add(fileSourceSplit);
+ }
+ }
+ }
+ // assign splits
+ context.assignSplit(taskId, currentTaskSplits);
+ // save the state of assigned splits
+ assignedSplit.addAll(currentTaskSplits);
+ // remove the assigned splits from pending splits
+ currentTaskSplits.forEach(pendingSplit::remove);
+ log.info(
+ "SubTask {} is assigned to [{}]",
+ taskId,
+ currentTaskSplits.stream()
+ .map(LocalFileSourceSplit::splitId)
+ .collect(Collectors.joining(",")));
+ context.signalNoMoreSplits(taskId);
+ }
+
+ private static int getSplitOwner(String tp, int numReaders) {
+ return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+ }
+
+ @Override
+ public void open() {
+ // do nothing
+ }
+
+ @Override
+ public void run() throws Exception {
+ // do nothing
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/state/LocalFileSourceState.java
similarity index 60%
rename from
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
rename to
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/state/LocalFileSourceState.java
index 2f43ff8d9f..2cc09f92ff 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/state/LocalFileSourceState.java
@@ -15,8 +15,24 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.state;
-import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSourceSplit;
-public class LocalSourceConfig extends BaseSourceConfig {}
+import java.io.Serializable;
+import java.util.Set;
+
+public class LocalFileSourceState implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Set<LocalFileSourceSplit> assignedSplit;
+
+ public LocalFileSourceState(Set<LocalFileSourceSplit> assignedSplit) {
+ this.assignedSplit = assignedSplit;
+ }
+
+ public Set<LocalFileSourceSplit> getAssignedSplit() {
+ return assignedSplit;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
new file mode 100644
index 0000000000..10d1a63429
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.file.local;
+
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestHelper;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.junit.jupiter.api.TestTemplate;
+
+import java.io.IOException;
+
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK and FLINK do not support multi
table")
+public class LocalFileWithMultipleTableIT extends TestSuiteBase {
+
+ /** Copy data files to container */
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory =
+ container -> {
+ ContainerUtil.copyFileIntoContainers(
+ "/excel/e2e.xlsx",
+
"/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
+ container);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/json/e2e.json",
+
"/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
+ container);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/orc/e2e.orc",
+
"/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc",
+ container);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/parquet/e2e.parquet",
+
"/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet",
+ container);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/text/e2e.txt",
+
"/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+ container);
+ };
+
+ @TestTemplate
+ public void
testLocalFileReadAndWriteInMultipleTableMode_excel(TestContainer container)
+ throws IOException, InterruptedException {
+ TestHelper helper = new TestHelper(container);
+ helper.execute("/excel/local_excel_to_assert_with_multipletable.conf");
+ }
+
+ @TestTemplate
+ public void
testLocalFileReadAndWriteInMultipleTableMode_json(TestContainer container)
+ throws IOException, InterruptedException {
+ TestHelper helper = new TestHelper(container);
+
helper.execute("/json/local_file_json_to_assert_with_multipletable.conf");
+ }
+
+ @TestTemplate
+ public void testLocalFileReadAndWriteInMultipleTableMode_orc(TestContainer
container)
+ throws IOException, InterruptedException {
+ TestHelper helper = new TestHelper(container);
+
helper.execute("/orc/local_file_orc_to_assert_with_multipletable.conf");
+ }
+
+ @TestTemplate
+ public void
testLocalFileReadAndWriteInMultipleTableMode_parquet(TestContainer container)
+ throws IOException, InterruptedException {
+ TestHelper helper = new TestHelper(container);
+
helper.execute("/parquet/local_file_parquet_to_assert_with_multipletable.conf");
+ }
+
+ @TestTemplate
+ public void
testLocalFileReadAndWriteInMultipleTableMode_text(TestContainer container)
+ throws IOException, InterruptedException {
+ TestHelper helper = new TestHelper(container);
+
helper.execute("/text/local_file_text_to_assert_with_multipletable.conf");
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_excel_to_assert_with_multipletable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_excel_to_assert_with_multipletable.conf
new file mode 100644
index 0000000000..b1c86c665d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_excel_to_assert_with_multipletable.conf
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ tables_configs = [
+ {
+ path = "/seatunnel/read/excel"
+ file_format_type = excel
+ field_delimiter = ;
+ skip_header_row_number = 1
+ schema = {
+ table = "fake01"
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ },
+ {
+ path = "/seatunnel/read/excel"
+ file_format_type = excel
+ field_delimiter = ;
+ skip_header_row_number = 1
+ schema = {
+ table = "fake02"
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+ ]
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ table-names = ["fake01", "fake02"]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert_with_multipletable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert_with_multipletable.conf
new file mode 100644
index 0000000000..0df06ad7ea
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert_with_multipletable.conf
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ tables_configs = [
+ {
+ path = "/seatunnel/read/json"
+ file_format_type = "json"
+ schema = {
+ table = "fake01"
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ C_MAP = "map<string, string>"
+ C_ARRAY = "array<int>"
+ C_STRING = string
+ C_BOOLEAN = boolean
+ C_TINYINT = tinyint
+ C_SMALLINT = smallint
+ C_INT = int
+ C_BIGINT = bigint
+ C_FLOAT = float
+ C_DOUBLE = double
+ C_BYTES = bytes
+ C_DATE = date
+ C_DECIMAL = "decimal(38, 18)"
+ C_TIMESTAMP = timestamp
+ }
+ }
+ }
+ },
+ {
+ path = "/seatunnel/read/json"
+ file_format_type = "json"
+ schema = {
+ table = "fake02"
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ C_MAP = "map<string, string>"
+ C_ARRAY = "array<int>"
+ C_STRING = string
+ C_BOOLEAN = boolean
+ C_TINYINT = tinyint
+ C_SMALLINT = smallint
+ C_INT = int
+ C_BIGINT = bigint
+ C_FLOAT = float
+ C_DOUBLE = double
+ C_BYTES = bytes
+ C_DATE = date
+ C_DECIMAL = "decimal(38, 18)"
+ C_TIMESTAMP = timestamp
+ }
+ }
+ }
+ }
+ ]
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ table-names = ["fake01", "fake02"]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
index 4595f83888..c04d448df9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
@@ -29,6 +29,10 @@ source {
LocalFile {
path = "/tmp/fake_empty"
file_format_type = "json"
+ # schema is needed for json type
+ schema {
+
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert_with_multipletable.conf
similarity index 69%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert_with_multipletable.conf
index 4595f83888..a1df2c1d62 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert_with_multipletable.conf
@@ -27,11 +27,30 @@ env {
source {
LocalFile {
- path = "/tmp/fake_empty"
- file_format_type = "json"
+ tables_configs = [
+ {
+ schema = {
+ table = "fake01"
+ }
+ path = "/seatunnel/read/orc"
+ file_format_type = "orc"
+ },
+ {
+ schema = {
+ table = "fake02"
+ }
+ path = "/seatunnel/read/orc"
+ file_format_type = "orc"
+ }
+ ]
+ result_table_name = "fake"
}
}
sink {
- Console {}
+ Assert {
+ rules {
+ table-names = ["fake01", "fake02"]
+ }
+ }
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert_with_multipletable.conf
similarity index 69%
copy from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
copy to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert_with_multipletable.conf
index 4595f83888..a30c72447b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert_with_multipletable.conf
@@ -27,11 +27,30 @@ env {
source {
LocalFile {
- path = "/tmp/fake_empty"
- file_format_type = "json"
+ tables_configs = [
+ {
+ schema = {
+ table = "fake01"
+ }
+ path = "/seatunnel/read/parquet"
+ file_format_type = "parquet"
+ },
+ {
+ schema = {
+ table = "fake02"
+ }
+ path = "/seatunnel/read/parquet"
+ file_format_type = "parquet"
+ }
+ ]
+ result_table_name = "fake"
}
}
sink {
- Console {}
+ Assert {
+ rules {
+ table-names = ["fake01", "fake02"]
+ }
+ }
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert_with_multipletable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert_with_multipletable.conf
new file mode 100644
index 0000000000..8e4e3db71b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert_with_multipletable.conf
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ tables_configs = [
+ {
+ path = "/seatunnel/read/text"
+ file_format_type = "text"
+ schema = {
+ table = "fake01"
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ },
+ {
+ path = "/seatunnel/read/text"
+ file_format_type = "text"
+ schema = {
+ table = "fake02"
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+ ]
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ table-names = ["fake01", "fake02"]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
index 6ad7d6fd42..f32df5fbb7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
@@ -20,51 +20,51 @@ env {
job.mode = "BATCH"
}
- source {
- FakeSource {
- schema = {
- fields {
- id = int
- val_bool = boolean
- val_int8 = tinyint
- val_int16 = smallint
- val_int32 = int
- val_int64 = bigint
- val_float = float
- val_double = double
- val_decimal = "decimal(16, 1)"
- val_string = string
- val_unixtime_micros = timestamp
- }
- }
- rows = [
- {
- kind = INSERT
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_BEFORE
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_AFTER
- fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- },
- {
- kind = DELETE
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
- }
- ]
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = int
+ val_bool = boolean
+ val_int8 = tinyint
+ val_int16 = smallint
+ val_int32 = int
+ val_int64 = bigint
+ val_float = float
+ val_double = double
+ val_decimal = "decimal(16, 1)"
+ val_string = string
+ val_unixtime_micros = timestamp
}
}
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = INSERT
+ fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ },
+ {
+ kind = DELETE
+ fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW",
"2020-02-02T02:02:02"]
+ }
+ ]
+ }
+}
sink {
kudu{