This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 72be6663ad [Feature] LocalFileSource support multiple table
72be6663ad is described below

commit 72be6663ad7e2252d683e365df5462114eff2cd3
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Nov 13 13:43:50 2023 +0800

    [Feature] LocalFileSource support multiple table
---
 docs/en/connector-v2/source/LocalFile.md           |  92 +++++++++--
 .../file/exception/FileConnectorErrorCode.java     |   7 +-
 .../file/source/BaseFileSourceReader.java          |   4 +-
 .../file/source/reader/ExcelReadStrategy.java      |   3 +-
 .../file/source/reader/JsonReadStrategy.java       |   3 +-
 .../file/source/reader/OrcReadStrategy.java        |   3 +-
 .../file/source/reader/ParquetReadStrategy.java    |   3 +-
 .../seatunnel/file/source/reader/ReadStrategy.java |   5 +-
 .../file/source/reader/ReadStrategyFactory.java    |  11 ++
 .../file/source/reader/TextReadStrategy.java       |   3 +-
 .../seatunnel/file/writer/OrcReadStrategyTest.java |   4 +-
 .../file/writer/ParquetReadStrategyTest.java       |  12 +-
 .../{LocalConf.java => LocalFileHadoopConf.java}   |   8 +-
 .../seatunnel/file/local/sink/LocalFileSink.java   |   6 +-
 .../file/local/source/LocalFileSource.java         | 149 +++++++-----------
 .../file/local/source/LocalFileSourceFactory.java  |  17 +-
 .../local/source/config/LocalFileSourceConfig.java | 171 +++++++++++++++++++++
 ...urceConfig.java => LocalFileSourceOptions.java} |  17 +-
 .../config/MultipleTableLocalFileSourceConfig.java |  58 +++++++
 .../reader/MultipleTableLocalFileSourceReader.java | 130 ++++++++++++++++
 .../LocalFileSourceSplit.java}                     |  24 ++-
 ...ultipleTableLocalFileSourceSplitEnumerator.java | 161 +++++++++++++++++++
 .../LocalFileSourceState.java}                     |  22 ++-
 .../file/local/LocalFileWithMultipleTableIT.java   | 103 +++++++++++++
 .../local_excel_to_assert_with_multipletable.conf  | 124 +++++++++++++++
 ...cal_file_json_to_assert_with_multipletable.conf | 120 +++++++++++++++
 .../test/resources/json/local_file_to_console.conf |   4 +
 ...cal_file_orc_to_assert_with_multipletable.conf} |  25 ++-
 ...file_parquet_to_assert_with_multipletable.conf} |  25 ++-
 ...cal_file_text_to_assert_with_multipletable.conf | 120 +++++++++++++++
 .../resources/write-cdc-changelog-to-kudu.conf     |  86 +++++------
 31 files changed, 1329 insertions(+), 191 deletions(-)

diff --git a/docs/en/connector-v2/source/LocalFile.md 
b/docs/en/connector-v2/source/LocalFile.md
index f562fd30ae..4d20ca532d 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -41,22 +41,23 @@ If you use SeaTunnel Engine, It automatically integrated 
the hadoop jar when you
 
 ## Options
 
-|           name            |  type   | required |    default value    |
-|---------------------------|---------|----------|---------------------|
-| path                      | string  | yes      | -                   |
-| file_format_type          | string  | yes      | -                   |
-| read_columns              | list    | no       | -                   |
-| delimiter/field_delimiter | string  | no       | \001                |
-| parse_partition_from_path | boolean | no       | true                |
-| date_format               | string  | no       | yyyy-MM-dd          |
-| datetime_format           | string  | no       | yyyy-MM-dd HH:mm:ss |
-| time_format               | string  | no       | HH:mm:ss            |
-| skip_header_row_number    | long    | no       | 0                   |
-| schema                    | config  | no       | -                   |
-| sheet_name                | string  | no       | -                   |
-| file_filter_pattern       | string  | no       | -                   |
-| compress_codec            | string  | no       | none                |
-| common-options            |         | no       | -                   |
+|           name            |  type   | required |            default value    
         |
+|---------------------------|---------|----------|--------------------------------------|
+| path                      | string  | yes      | -                           
         |
+| file_format_type          | string  | yes      | -                           
         |
+| read_columns              | list    | no       | -                           
         |
+| delimiter/field_delimiter | string  | no       | \001                        
         |
+| parse_partition_from_path | boolean | no       | true                        
         |
+| date_format               | string  | no       | yyyy-MM-dd                  
         |
+| datetime_format           | string  | no       | yyyy-MM-dd HH:mm:ss         
         |
+| time_format               | string  | no       | HH:mm:ss                    
         |
+| skip_header_row_number    | long    | no       | 0                           
         |
+| schema                    | config  | no       | -                           
         |
+| sheet_name                | string  | no       | -                           
         |
+| file_filter_pattern       | string  | no       | -                           
         |
+| compress_codec            | string  | no       | none                        
         |
+| common-options            |         | no       | -                           
         |
+| tables_configs            | list    | no       | used to define a multiple 
table task |
 
 ### path [string]
 
@@ -244,8 +245,14 @@ The compress codec of files and the details that supported 
as the following show
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details
 
+### tables_configs
+
+Used to define a multiple table task, when you have multiple tables to read, 
you can use this option to define multiple tables.
+
 ## Example
 
+### One Table
+
 ```hocon
 
 LocalFile {
@@ -270,6 +277,59 @@ LocalFile {
 
 ```
 
+### Multiple Table
+
+```hocon
+
+LocalFile {
+  tables_configs = [
+    {
+      schema {
+        table = "student"
+      }
+      path = "/apps/hive/demo/student"
+      file_format_type = "parquet"
+    },
+    {
+      schema {
+        table = "teacher"
+      }
+      path = "/apps/hive/demo/teacher"
+      file_format_type = "parquet"
+    }
+  ]
+}
+
+```
+
+```hocon
+
+LocalFile {
+  tables_configs = [
+    {
+      schema {
+        fields {
+          name = string
+          age = int
+        }
+      }
+      path = "/apps/hive/demo/student"
+      file_format_type = "json"
+    },
+    {
+      schema {
+        fields {
+          name = string
+          age = int
+        }
+      }
+      path = "/apps/hive/demo/teacher"
+      file_format_type = "json"
+    }
+}
+
+```
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
index 65e9590f34..27dca4a6bc 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
@@ -23,7 +23,12 @@ public enum FileConnectorErrorCode implements 
SeaTunnelErrorCode {
     FILE_TYPE_INVALID("FILE-01", "File type is invalid"),
     DATA_DESERIALIZE_FAILED("FILE-02", "Data deserialization failed"),
     FILE_LIST_GET_FAILED("FILE-03", "Get file list failed"),
-    FILE_LIST_EMPTY("FILE-04", "File list is empty");
+    FILE_LIST_EMPTY("FILE-04", "File list is empty"),
+    AGGREGATE_COMMIT_ERROR("FILE-05", "Aggregate committer error"),
+    FILE_READ_STRATEGY_NOT_SUPPORT("FILE-06", "File strategy not support"),
+    FORMAT_NOT_SUPPORT("FILE-07", "Format not support"),
+    FILE_READ_FAILED("FILE-08", "File read failed"),
+    ;
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
index 18332b9436..7082b6171e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
@@ -63,7 +63,9 @@ public class BaseFileSourceReader implements 
SourceReader<SeaTunnelRow, FileSour
             FileSourceSplit split = sourceSplits.poll();
             if (null != split) {
                 try {
-                    readStrategy.read(split.splitId(), output);
+                    // todo: If there is only one table , the tableId is not 
needed, but it's better
+                    // to set this
+                    readStrategy.read(split.splitId(), "", output);
                 } catch (Exception e) {
                     String errorMsg =
                             String.format("Read data from this file [%s] 
failed", split.splitId());
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
index 02f6c30772..ba7865b315 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
@@ -74,7 +74,7 @@ public class ExcelReadStrategy extends AbstractReadStrategy {
 
     @SneakyThrows
     @Override
-    public void read(String path, Collector<SeaTunnelRow> output) {
+    public void read(String path, String tableId, Collector<SeaTunnelRow> 
output) {
         Configuration conf = getConfiguration();
         FileSystem fs = FileSystem.get(conf);
         Map<String, String> partitionsMap = parsePartitionsByPath(path);
@@ -124,6 +124,7 @@ public class ExcelReadStrategy extends AbstractReadStrategy 
{
                                     seaTunnelRow.setField(index++, value);
                                 }
                             }
+                            seaTunnelRow.setTableId(tableId);
                             output.collect(seaTunnelRow);
                         });
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
index 9095f97236..d8dccd86de 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
@@ -69,7 +69,7 @@ public class JsonReadStrategy extends AbstractReadStrategy {
     }
 
     @Override
-    public void read(String path, Collector<SeaTunnelRow> output)
+    public void read(String path, String tableId, Collector<SeaTunnelRow> 
output)
             throws FileConnectorException, IOException {
         Configuration conf = getConfiguration();
         FileSystem fs = FileSystem.get(conf);
@@ -105,6 +105,7 @@ public class JsonReadStrategy extends AbstractReadStrategy {
                                             seaTunnelRow.setField(index++, 
value);
                                         }
                                     }
+                                    seaTunnelRow.setTableId(tableId);
                                     output.collect(seaTunnelRow);
                                 } catch (IOException e) {
                                     String errorMsg =
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index 4bdbac1e96..56c782e3d0 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -74,7 +74,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
     private static final long MIN_SIZE = 16 * 1024;
 
     @Override
-    public void read(String path, Collector<SeaTunnelRow> output)
+    public void read(String path, String tableId, Collector<SeaTunnelRow> 
output)
             throws FileConnectorException, IOException {
         if (Boolean.FALSE.equals(checkFileType(path))) {
             String errorMsg =
@@ -120,6 +120,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                         }
                     }
                     SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
+                    seaTunnelRow.setTableId(tableId);
                     output.collect(seaTunnelRow);
                     num++;
                 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 944d3ee86a..e7a0c0af4a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -81,7 +81,7 @@ public class ParquetReadStrategy extends AbstractReadStrategy 
{
     private int[] indexes;
 
     @Override
-    public void read(String path, Collector<SeaTunnelRow> output)
+    public void read(String path, String tableId, Collector<SeaTunnelRow> 
output)
             throws FileConnectorException, IOException {
         if (Boolean.FALSE.equals(checkFileType(path))) {
             String errorMsg =
@@ -119,6 +119,7 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                     fields[i] = resolveObject(data, 
seaTunnelRowType.getFieldType(i));
                 }
                 SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
+                seaTunnelRow.setTableId(tableId);
                 output.collect(seaTunnelRow);
             }
         }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index b53e97140e..3f1a869b18 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -36,17 +36,20 @@ public interface ReadStrategy extends Serializable {
 
     Configuration getConfiguration(HadoopConf conf);
 
-    void read(String path, Collector<SeaTunnelRow> output)
+    void read(String path, String tableId, Collector<SeaTunnelRow> output)
             throws IOException, FileConnectorException;
 
     SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String 
path)
             throws FileConnectorException;
 
+    // todo: use CatalogTable
     void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
 
     List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws 
IOException;
 
+    // todo: use ReadonlyConfig
     void setPluginConfig(Config pluginConfig);
 
+    // todo: use CatalogTable
     SeaTunnelRowType getActualSeaTunnelRowTypeInfo();
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
index 3aa1874edf..777490c031 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
@@ -17,8 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
 import lombok.extern.slf4j.Slf4j;
@@ -28,6 +31,14 @@ public class ReadStrategyFactory {
 
     private ReadStrategyFactory() {}
 
+    public static ReadStrategy of(ReadonlyConfig readonlyConfig, HadoopConf 
hadoopConf) {
+        ReadStrategy readStrategy =
+                
of(readonlyConfig.get(BaseSourceConfig.FILE_FORMAT_TYPE).name());
+        readStrategy.setPluginConfig(readonlyConfig.toConfig());
+        readStrategy.init(hadoopConf);
+        return readStrategy;
+    }
+
     public static ReadStrategy of(String fileType) {
         try {
             FileFormat fileFormat = FileFormat.valueOf(fileType.toUpperCase());
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 816e50b57b..586d165e1b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -64,7 +64,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
     private int[] indexes;
 
     @Override
-    public void read(String path, Collector<SeaTunnelRow> output)
+    public void read(String path, String tableId, Collector<SeaTunnelRow> 
output)
             throws FileConnectorException, IOException {
         Configuration conf = getConfiguration();
         FileSystem fs = FileSystem.get(conf);
@@ -118,6 +118,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
                                             seaTunnelRow.setField(index++, 
value);
                                         }
                                     }
+                                    seaTunnelRow.setTableId(tableId);
                                     output.collect(seaTunnelRow);
                                 } catch (IOException e) {
                                     String errorMsg =
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
index 5e8eb9a2c8..56fbaae386 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
@@ -52,7 +52,7 @@ public class OrcReadStrategyTest {
                 orcReadStrategy.getSeaTunnelRowTypeInfo(localConf, 
orcFilePath);
         Assertions.assertNotNull(seaTunnelRowTypeInfo);
         System.out.println(seaTunnelRowTypeInfo);
-        orcReadStrategy.read(orcFilePath, testCollector);
+        orcReadStrategy.read(orcFilePath, "", testCollector);
         for (SeaTunnelRow row : testCollector.getRows()) {
             Assertions.assertEquals(row.getField(0).getClass(), Boolean.class);
             Assertions.assertEquals(row.getField(1).getClass(), Byte.class);
@@ -78,7 +78,7 @@ public class OrcReadStrategyTest {
                 orcReadStrategy.getSeaTunnelRowTypeInfo(localConf, 
orcFilePath);
         Assertions.assertNotNull(seaTunnelRowTypeInfo);
         System.out.println(seaTunnelRowTypeInfo);
-        orcReadStrategy.read(orcFilePath, testCollector);
+        orcReadStrategy.read(orcFilePath, "", testCollector);
         for (SeaTunnelRow row : testCollector.getRows()) {
             Assertions.assertEquals(row.getField(0).getClass(), Byte.class);
             Assertions.assertEquals(row.getField(1).getClass(), Boolean.class);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index 0a3bee6282..1c36a91453 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -53,7 +53,7 @@ public class ParquetReadStrategyTest {
         Assertions.assertNotNull(seaTunnelRowTypeInfo);
         System.out.println(seaTunnelRowTypeInfo);
         TestCollector testCollector = new TestCollector();
-        parquetReadStrategy.read(path, testCollector);
+        parquetReadStrategy.read(path, "", testCollector);
     }
 
     @Test
@@ -69,7 +69,7 @@ public class ParquetReadStrategyTest {
         Assertions.assertNotNull(seaTunnelRowTypeInfo);
         System.out.println(seaTunnelRowTypeInfo);
         TestCollector testCollector = new TestCollector();
-        parquetReadStrategy.read(path, testCollector);
+        parquetReadStrategy.read(path, "", testCollector);
     }
 
     @Test
@@ -88,13 +88,13 @@ public class ParquetReadStrategyTest {
         TimeZone tz1 = TimeZone.getTimeZone("Asia/Shanghai");
         TimeZone.setDefault(tz1);
         TestCollector testCollector = new TestCollector();
-        parquetReadStrategy.read(path, testCollector);
+        parquetReadStrategy.read(path, "", testCollector);
         LocalDateTime time1 = (LocalDateTime) 
testCollector.getRows().get(0).getField(index);
 
         TimeZone tz2 = TimeZone.getTimeZone("UTC");
         TimeZone.setDefault(tz2);
         TestCollector testCollector2 = new TestCollector();
-        parquetReadStrategy.read(path, testCollector2);
+        parquetReadStrategy.read(path, "", testCollector2);
         LocalDateTime time2 = (LocalDateTime) 
testCollector2.getRows().get(0).getField(index);
 
         Assertions.assertTrue(time1.isAfter(time2));
@@ -121,7 +121,7 @@ public class ParquetReadStrategyTest {
         Assertions.assertNotNull(seaTunnelRowTypeInfo);
         System.out.println(seaTunnelRowTypeInfo);
         TestCollector testCollector = new TestCollector();
-        parquetReadStrategy.read(path, testCollector);
+        parquetReadStrategy.read(path, "", testCollector);
         List<SeaTunnelRow> rows = testCollector.getRows();
         for (SeaTunnelRow row : rows) {
             Assertions.assertEquals(row.getField(0).getClass(), Long.class);
@@ -151,7 +151,7 @@ public class ParquetReadStrategyTest {
         Assertions.assertNotNull(seaTunnelRowTypeInfo);
         System.out.println(seaTunnelRowTypeInfo);
         TestCollector testCollector = new TestCollector();
-        parquetReadStrategy.read(path, testCollector);
+        parquetReadStrategy.read(path, "", testCollector);
     }
 
     public static class TestCollector implements Collector<SeaTunnelRow> {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalFileHadoopConf.java
similarity index 85%
rename from 
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java
rename to 
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalFileHadoopConf.java
index b1f3f35828..284167ce7d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalFileHadoopConf.java
@@ -19,12 +19,14 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.local.config;
 
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 
-public class LocalConf extends HadoopConf {
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+
+public class LocalFileHadoopConf extends HadoopConf {
     private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.LocalFileSystem";
     private static final String SCHEMA = "file";
 
-    public LocalConf(String hdfsNameKey) {
-        super(hdfsNameKey);
+    public LocalFileHadoopConf() {
+        super(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
index a11b8067a7..4d8037ef5f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java
@@ -22,11 +22,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
 
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-
 import com.google.auto.service.AutoService;
 
 @AutoService(SeaTunnelSink.class)
@@ -40,6 +38,6 @@ public class LocalFileSink extends BaseFileSink {
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         super.prepare(pluginConfig);
-        hadoopConf = new 
LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT);
+        hadoopConf = new LocalFileHadoopConf();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index 8ff31155b8..76ee4e452e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -17,35 +17,37 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.local.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
+import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalSourceConfig;
-import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalFileSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.reader.MultipleTableLocalFileSourceReader;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.MultipleTableLocalFileSourceSplitEnumerator;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.state.LocalFileSourceState;
 
-import org.apache.hadoop.fs.CommonConfigurationKeys;
+import java.util.List;
+import java.util.stream.Collectors;
 
-import com.google.auto.service.AutoService;
+public class LocalFileSource
+        implements SeaTunnelSource<SeaTunnelRow, LocalFileSourceSplit, 
LocalFileSourceState>,
+                SupportParallelism,
+                SupportColumnProjection {
 
-import java.io.IOException;
+    private final MultipleTableLocalFileSourceConfig 
multipleTableLocalFileSourceConfig;
 
-@AutoService(SeaTunnelSource.class)
-public class LocalFileSource extends BaseFileSource {
+    public LocalFileSource(ReadonlyConfig readonlyConfig) {
+        this.multipleTableLocalFileSourceConfig =
+                new MultipleTableLocalFileSourceConfig(readonlyConfig);
+    }
 
     @Override
     public String getPluginName() {
@@ -53,75 +55,36 @@ public class LocalFileSource extends BaseFileSource {
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        LocalSourceConfig.FILE_PATH.key(),
-                        LocalSourceConfig.FILE_FORMAT_TYPE.key());
-        if (!result.isSuccess()) {
-            throw new FileConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        readStrategy =
-                ReadStrategyFactory.of(
-                        
pluginConfig.getString(LocalSourceConfig.FILE_FORMAT_TYPE.key()));
-        readStrategy.setPluginConfig(pluginConfig);
-        String path = 
pluginConfig.getString(LocalSourceConfig.FILE_PATH.key());
-        hadoopConf = new 
LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT);
-        try {
-            filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
-        } catch (IOException e) {
-            String errorMsg = String.format("Get file list from this path [%s] 
failed", path);
-            throw new FileConnectorException(
-                    FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
-        }
-        // support user-defined schema
-        FileFormat fileFormat =
-                FileFormat.valueOf(
-                        pluginConfig
-                                
.getString(LocalSourceConfig.FILE_FORMAT_TYPE.key())
-                                .toUpperCase());
-        // only json text csv type support user-defined schema now
-        if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
-            switch (fileFormat) {
-                case CSV:
-                case TEXT:
-                case JSON:
-                case EXCEL:
-                    SeaTunnelRowType userDefinedSchema =
-                            
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
-                    readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
-                    rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
-                    break;
-                case ORC:
-                case PARQUET:
-                    throw new FileConnectorException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
-                            "SeaTunnel does not support user-defined schema 
for [parquet, orc] files");
-                default:
-                    // never got in there
-                    throw new FileConnectorException(
-                            CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
-                            "SeaTunnel does not supported this file format");
-            }
-        } else {
-            if (filePaths.isEmpty()) {
-                // When the directory is empty, distribute default behavior 
schema
-                rowType = CatalogTableUtil.buildSimpleTextSchema();
-                return;
-            }
-            try {
-                rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
-            } catch (FileConnectorException e) {
-                String errorMsg =
-                        String.format("Get table schema from file [%s] 
failed", filePaths.get(0));
-                throw new FileConnectorException(
-                        CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, 
errorMsg, e);
-            }
-        }
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public List<CatalogTable> getProducedCatalogTables() {
+        return 
multipleTableLocalFileSourceConfig.getLocalFileSourceConfigs().stream()
+                .map(LocalFileSourceConfig::getCatalogTable)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, LocalFileSourceSplit> createReader(
+            SourceReader.Context readerContext) {
+        return new MultipleTableLocalFileSourceReader(
+                readerContext, multipleTableLocalFileSourceConfig);
+    }
+
+    @Override
+    public SourceSplitEnumerator<LocalFileSourceSplit, LocalFileSourceState> 
createEnumerator(
+            SourceSplitEnumerator.Context<LocalFileSourceSplit> 
enumeratorContext) {
+        return new MultipleTableLocalFileSourceSplitEnumerator(
+                enumeratorContext, multipleTableLocalFileSourceConfig);
+    }
+
+    @Override
+    public SourceSplitEnumerator<LocalFileSourceSplit, LocalFileSourceState> 
restoreEnumerator(
+            SourceSplitEnumerator.Context<LocalFileSourceSplit> 
enumeratorContext,
+            LocalFileSourceState checkpointState) {
+        return new MultipleTableLocalFileSourceSplitEnumerator(
+                enumeratorContext, multipleTableLocalFileSourceConfig, 
checkpointState);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index cf34c99a5b..0e1b8163f8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -19,16 +19,20 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.local.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalFileSourceOptions;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
 import java.util.Arrays;
 
 @AutoService(Factory.class)
@@ -38,11 +42,18 @@ public class LocalFileSourceFactory implements 
TableSourceFactory {
         return FileSystemType.LOCAL.getFileSystemPluginName();
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
LocalFileSource(context.getOptions());
+    }
+
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(LocalSourceConfig.FILE_PATH)
-                .required(BaseSourceConfig.FILE_FORMAT_TYPE)
+                .optional(LocalFileSourceOptions.tables_configs)
+                .optional(BaseSourceConfig.FILE_PATH)
+                .optional(BaseSourceConfig.FILE_FORMAT_TYPE)
                 .conditional(
                         BaseSourceConfig.FILE_FORMAT_TYPE,
                         FileFormat.TEXT,
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java
new file mode 100644
index 0000000000..f2fd7b8808
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceConfig.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Getter
+public class LocalFileSourceConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final CatalogTable catalogTable;
+    private final FileFormat fileFormat;
+    private final ReadStrategy readStrategy;
+    private final List<String> filePaths;
+    private final LocalFileHadoopConf localFileHadoopConf;
+
+    public LocalFileSourceConfig(ReadonlyConfig readonlyConfig) {
+        validateConfig(readonlyConfig);
+        this.fileFormat = 
readonlyConfig.get(LocalFileSourceOptions.FILE_FORMAT_TYPE);
+        this.localFileHadoopConf = new LocalFileHadoopConf();
+        this.readStrategy = ReadStrategyFactory.of(readonlyConfig, 
localFileHadoopConf);
+        this.filePaths = parseFilePaths(readonlyConfig);
+        this.catalogTable = parseCatalogTable(readonlyConfig);
+    }
+
+    private void validateConfig(ReadonlyConfig readonlyConfig) {
+        if 
(!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_PATH).isPresent()) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            FileSystemType.LOCAL.getFileSystemPluginName(),
+                            PluginType.SOURCE,
+                            LocalFileSourceOptions.FILE_PATH + " is 
required"));
+        }
+        if 
(!readonlyConfig.getOptional(LocalFileSourceOptions.FILE_FORMAT_TYPE).isPresent())
 {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            FileSystemType.LOCAL.getFileSystemPluginName(),
+                            PluginType.SOURCE,
+                            LocalFileSourceOptions.FILE_FORMAT_TYPE.key() + " 
is required"));
+        }
+    }
+
+    private List<String> parseFilePaths(ReadonlyConfig readonlyConfig) {
+        String rootPath = null;
+        try {
+            rootPath = readonlyConfig.get(LocalFileSourceOptions.FILE_PATH);
+            return readStrategy.getFileNamesByPath(localFileHadoopConf, 
rootPath);
+        } catch (Exception ex) {
+            String errorMsg = String.format("Get file list from this path [%s] 
failed", rootPath);
+            throw new FileConnectorException(
+                    FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, ex);
+        }
+    }
+
+    private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) {
+        final CatalogTable catalogTable;
+        if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) 
{
+            catalogTable =
+                    CatalogTableUtil.buildWithConfig(
+                            FileSystemType.LOCAL.getFileSystemPluginName(), 
readonlyConfig);
+        } else {
+            catalogTable = CatalogTableUtil.buildSimpleTextTable();
+        }
+        if (CollectionUtils.isEmpty(filePaths)) {
+            return catalogTable;
+        }
+        switch (fileFormat) {
+            case CSV:
+            case TEXT:
+            case JSON:
+            case EXCEL:
+                
readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+                return newCatalogTable(catalogTable, 
readStrategy.getActualSeaTunnelRowTypeInfo());
+            case ORC:
+            case PARQUET:
+                return newCatalogTable(
+                        catalogTable,
+                        readStrategy.getSeaTunnelRowTypeInfo(
+                                localFileHadoopConf, filePaths.get(0)));
+            default:
+                throw new FileConnectorException(
+                        FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
+                        "SeaTunnel does not supported this file format: [" + 
fileFormat + "]");
+        }
+    }
+
+    private CatalogTable newCatalogTable(
+            CatalogTable catalogTable, SeaTunnelRowType seaTunnelRowType) {
+        TableSchema tableSchema = catalogTable.getTableSchema();
+
+        Map<String, Column> columnMap =
+                tableSchema.getColumns().stream()
+                        .collect(Collectors.toMap(Column::getName, 
Function.identity()));
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+
+        List<Column> finalColumns = new ArrayList<>();
+        for (int i = 0; i < fieldNames.length; i++) {
+            Column column = columnMap.get(fieldNames[i]);
+            if (column != null) {
+                finalColumns.add(column);
+            } else {
+                finalColumns.add(
+                        PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0, 
false, null, null));
+            }
+        }
+
+        TableSchema finalSchema =
+                TableSchema.builder()
+                        .columns(finalColumns)
+                        .primaryKey(tableSchema.getPrimaryKey())
+                        .constraintKey(tableSchema.getConstraintKeys())
+                        .build();
+
+        return CatalogTable.of(
+                catalogTable.getTableId(),
+                finalSchema,
+                catalogTable.getOptions(),
+                catalogTable.getPartitionKeys(),
+                catalogTable.getComment(),
+                catalogTable.getCatalogName());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceOptions.java
similarity index 58%
copy from 
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
copy to 
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceOptions.java
index 2f43ff8d9f..6cd689fcdc 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalFileSourceOptions.java
@@ -17,6 +17,21 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 
-public class LocalSourceConfig extends BaseSourceConfig {}
+import java.util.List;
+import java.util.Map;
+
+public final class LocalFileSourceOptions extends BaseSourceConfig {
+
+    public static final Option<List<Map<String, Object>>> tables_configs =
+            Options.key("tables_configs")
+                    .type(new TypeReference<List<Map<String, Object>>>() {})
+                    .noDefaultValue()
+                    .withDescription(
+                            "Local file source configs, used to create 
multiple local file source.");
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/MultipleTableLocalFileSourceConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/MultipleTableLocalFileSourceConfig.java
new file mode 100644
index 0000000000..0abbaa511e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/MultipleTableLocalFileSourceConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import com.google.common.collect.Lists;
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MultipleTableLocalFileSourceConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @Getter private List<LocalFileSourceConfig> localFileSourceConfigs;
+
+    public MultipleTableLocalFileSourceConfig(ReadonlyConfig 
localFileSourceRootConfig) {
+        if (localFileSourceRootConfig
+                .getOptional(LocalFileSourceOptions.tables_configs)
+                .isPresent()) {
+            parseFromLocalFileSourceConfigs(localFileSourceRootConfig);
+        } else {
+            parseFromLocalFileSourceConfig(localFileSourceRootConfig);
+        }
+    }
+
+    private void parseFromLocalFileSourceConfigs(ReadonlyConfig 
localFileSourceRootConfig) {
+        this.localFileSourceConfigs =
+                
localFileSourceRootConfig.get(LocalFileSourceOptions.tables_configs).stream()
+                        .map(ReadonlyConfig::fromMap)
+                        .map(LocalFileSourceConfig::new)
+                        .collect(Collectors.toList());
+    }
+
+    private void parseFromLocalFileSourceConfig(ReadonlyConfig 
localFileSourceRootConfig) {
+        LocalFileSourceConfig localFileSourceConfig =
+                new LocalFileSourceConfig(localFileSourceRootConfig);
+        this.localFileSourceConfigs = 
Lists.newArrayList(localFileSourceConfig);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/reader/MultipleTableLocalFileSourceReader.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/reader/MultipleTableLocalFileSourceReader.java
new file mode 100644
index 0000000000..bd990db50f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/reader/MultipleTableLocalFileSourceReader.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalFileSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.Collectors;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode.FILE_READ_FAILED;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode.FILE_READ_STRATEGY_NOT_SUPPORT;
+
+@Slf4j
+public class MultipleTableLocalFileSourceReader
+        implements SourceReader<SeaTunnelRow, LocalFileSourceSplit> {
+
+    private final SourceReader.Context context;
+    private volatile boolean noMoreSplit;
+
+    private final Deque<LocalFileSourceSplit> sourceSplits = new 
ConcurrentLinkedDeque<>();
+
+    private final Map<String, ReadStrategy> readStrategyMap;
+
+    public MultipleTableLocalFileSourceReader(
+            SourceReader.Context context,
+            MultipleTableLocalFileSourceConfig 
multipleTableLocalFileSourceConfig) {
+        this.context = context;
+        this.readStrategyMap =
+                
multipleTableLocalFileSourceConfig.getLocalFileSourceConfigs().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        localFileSourceConfig ->
+                                                localFileSourceConfig
+                                                        .getCatalogTable()
+                                                        .getTableId()
+                                                        .toTablePath()
+                                                        .toString(),
+                                        
LocalFileSourceConfig::getReadStrategy));
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) {
+        synchronized (output.getCheckpointLock()) {
+            LocalFileSourceSplit split = sourceSplits.poll();
+            if (null != split) {
+                ReadStrategy readStrategy = 
readStrategyMap.get(split.getTableId());
+                if (readStrategy == null) {
+                    throw new FileConnectorException(
+                            FILE_READ_STRATEGY_NOT_SUPPORT,
+                            "Cannot found the read strategy for this table: ["
+                                    + split.getTableId()
+                                    + "]");
+                }
+                try {
+                    readStrategy.read(split.getFilePath(), split.getTableId(), 
output);
+                } catch (Exception e) {
+                    String errorMsg =
+                            String.format("Read data from this file [%s] 
failed", split.splitId());
+                    throw new FileConnectorException(FILE_READ_FAILED, 
errorMsg, e);
+                }
+            } else if (noMoreSplit && sourceSplits.isEmpty()) {
+                // signal to the source that we have reached the end of the 
data.
+                log.info(
+                        "There is no more element for the bounded 
MultipleTableLocalFileSourceReader");
+                context.signalNoMoreElement();
+            }
+        }
+    }
+
+    @Override
+    public List<LocalFileSourceSplit> snapshotState(long checkpointId) {
+        return new ArrayList<>(sourceSplits);
+    }
+
+    @Override
+    public void addSplits(List<LocalFileSourceSplit> splits) {
+        sourceSplits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        noMoreSplit = true;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        // do nothing
+    }
+
+    @Override
+    public void open() throws Exception {
+        // do nothing
+        log.info("Opened the MultipleTableLocalFileSourceReader");
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing
+        log.info("Closed the MultipleTableLocalFileSourceReader");
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSourceSplit.java
similarity index 62%
copy from 
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
copy to 
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSourceSplit.java
index 2f43ff8d9f..89bab1bee4 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSourceSplit.java
@@ -15,8 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.split;
 
-import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
+import org.apache.seatunnel.api.source.SourceSplit;
 
-public class LocalSourceConfig extends BaseSourceConfig {}
+import lombok.Getter;
+
+public class LocalFileSourceSplit implements SourceSplit {
+
+    private static final long serialVersionUID = 1L;
+
+    @Getter private final String tableId;
+    @Getter private final String filePath;
+
+    public LocalFileSourceSplit(String tableId, String filePath) {
+        this.tableId = tableId;
+        this.filePath = filePath;
+    }
+
+    @Override
+    public String splitId() {
+        return tableId + "_" + filePath;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/MultipleTableLocalFileSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/MultipleTableLocalFileSourceSplitEnumerator.java
new file mode 100644
index 0000000000..f00885f496
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/MultipleTableLocalFileSourceSplitEnumerator.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.split;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalFileSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.state.LocalFileSourceState;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class MultipleTableLocalFileSourceSplitEnumerator
+        implements SourceSplitEnumerator<LocalFileSourceSplit, 
LocalFileSourceState> {
+
+    private final SourceSplitEnumerator.Context<LocalFileSourceSplit> context;
+    private final Set<LocalFileSourceSplit> pendingSplit;
+    private final Set<LocalFileSourceSplit> assignedSplit;
+    private final Map<String, List<String>> filePathMap;
+
+    public MultipleTableLocalFileSourceSplitEnumerator(
+            SourceSplitEnumerator.Context<LocalFileSourceSplit> context,
+            MultipleTableLocalFileSourceConfig 
multipleTableLocalFileSourceConfig) {
+        this.context = context;
+        this.filePathMap =
+                
multipleTableLocalFileSourceConfig.getLocalFileSourceConfigs().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        localFileSourceConfig ->
+                                                localFileSourceConfig
+                                                        .getCatalogTable()
+                                                        .getTableId()
+                                                        .toTablePath()
+                                                        .toString(),
+                                        LocalFileSourceConfig::getFilePaths));
+        this.assignedSplit = new HashSet<>();
+        this.pendingSplit = new HashSet<>();
+    }
+
+    public MultipleTableLocalFileSourceSplitEnumerator(
+            SourceSplitEnumerator.Context<LocalFileSourceSplit> context,
+            MultipleTableLocalFileSourceConfig 
multipleTableLocalFileSourceConfig,
+            LocalFileSourceState localFileSourceState) {
+        this(context, multipleTableLocalFileSourceConfig);
+        this.assignedSplit.addAll(localFileSourceState.getAssignedSplit());
+    }
+
+    @Override
+    public void addSplitsBack(List<LocalFileSourceSplit> splits, int 
subtaskId) {
+        if (CollectionUtils.isEmpty(splits)) {
+            return;
+        }
+        pendingSplit.addAll(splits);
+        assignSplit(subtaskId);
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {}
+
+    @Override
+    public void registerReader(int subtaskId) {
+        for (Map.Entry<String, List<String>> filePathEntry : 
filePathMap.entrySet()) {
+            String tableId = filePathEntry.getKey();
+            List<String> filePaths = filePathEntry.getValue();
+            for (String filePath : filePaths) {
+                pendingSplit.add(new LocalFileSourceSplit(tableId, filePath));
+            }
+        }
+        assignSplit(subtaskId);
+    }
+
+    @Override
+    public LocalFileSourceState snapshotState(long checkpointId) {
+        return new LocalFileSourceState(assignedSplit);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        // do nothing.
+    }
+
+    private void assignSplit(int taskId) {
+        List<LocalFileSourceSplit> currentTaskSplits = new ArrayList<>();
+        if (context.currentParallelism() == 1) {
+            // if parallelism == 1, we should assign all the splits to reader
+            currentTaskSplits.addAll(pendingSplit);
+        } else {
+            // if parallelism > 1, according to hashCode of split's id to 
determine whether to
+            // allocate the current task
+            for (LocalFileSourceSplit fileSourceSplit : pendingSplit) {
+                int splitOwner =
+                        getSplitOwner(fileSourceSplit.splitId(), 
context.currentParallelism());
+                if (splitOwner == taskId) {
+                    currentTaskSplits.add(fileSourceSplit);
+                }
+            }
+        }
+        // assign splits
+        context.assignSplit(taskId, currentTaskSplits);
+        // save the state of assigned splits
+        assignedSplit.addAll(currentTaskSplits);
+        // remove the assigned splits from pending splits
+        currentTaskSplits.forEach(pendingSplit::remove);
+        log.info(
+                "SubTask {} is assigned to [{}]",
+                taskId,
+                currentTaskSplits.stream()
+                        .map(LocalFileSourceSplit::splitId)
+                        .collect(Collectors.joining(",")));
+        context.signalNoMoreSplits(taskId);
+    }
+
+    private static int getSplitOwner(String tp, int numReaders) {
+        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+    }
+
+    @Override
+    public void open() {
+        // do nothing
+    }
+
+    @Override
+    public void run() throws Exception {
+        // do nothing
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/state/LocalFileSourceState.java
similarity index 60%
rename from 
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
rename to 
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/state/LocalFileSourceState.java
index 2f43ff8d9f..2cc09f92ff 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/config/LocalSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/state/LocalFileSourceState.java
@@ -15,8 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.local.source.config;
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.state;
 
-import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSourceSplit;
 
-public class LocalSourceConfig extends BaseSourceConfig {}
+import java.io.Serializable;
+import java.util.Set;
+
+public class LocalFileSourceState implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Set<LocalFileSourceSplit> assignedSplit;
+
+    public LocalFileSourceState(Set<LocalFileSourceSplit> assignedSplit) {
+        this.assignedSplit = assignedSplit;
+    }
+
+    public Set<LocalFileSourceSplit> getAssignedSplit() {
+        return assignedSplit;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
new file mode 100644
index 0000000000..10d1a63429
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileWithMultipleTableIT.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.file.local;
+
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestHelper;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.junit.jupiter.api.TestTemplate;
+
+import java.io.IOException;
+
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK},
+        disabledReason = "Currently SPARK and FLINK do not support multi 
table")
+public class LocalFileWithMultipleTableIT extends TestSuiteBase {
+
+    /** Copy data files to container */
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory =
+            container -> {
+                ContainerUtil.copyFileIntoContainers(
+                        "/excel/e2e.xlsx",
+                        
"/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx",
+                        container);
+
+                ContainerUtil.copyFileIntoContainers(
+                        "/json/e2e.json",
+                        
"/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json",
+                        container);
+
+                ContainerUtil.copyFileIntoContainers(
+                        "/orc/e2e.orc",
+                        
"/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc",
+                        container);
+
+                ContainerUtil.copyFileIntoContainers(
+                        "/parquet/e2e.parquet",
+                        
"/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet",
+                        container);
+
+                ContainerUtil.copyFileIntoContainers(
+                        "/text/e2e.txt",
+                        
"/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt",
+                        container);
+            };
+
+    @TestTemplate
+    public void 
testLocalFileReadAndWriteInMultipleTableMode_excel(TestContainer container)
+            throws IOException, InterruptedException {
+        TestHelper helper = new TestHelper(container);
+        helper.execute("/excel/local_excel_to_assert_with_multipletable.conf");
+    }
+
+    @TestTemplate
+    public void 
testLocalFileReadAndWriteInMultipleTableMode_json(TestContainer container)
+            throws IOException, InterruptedException {
+        TestHelper helper = new TestHelper(container);
+        
helper.execute("/json/local_file_json_to_assert_with_multipletable.conf");
+    }
+
+    @TestTemplate
+    public void testLocalFileReadAndWriteInMultipleTableMode_orc(TestContainer 
container)
+            throws IOException, InterruptedException {
+        TestHelper helper = new TestHelper(container);
+        
helper.execute("/orc/local_file_orc_to_assert_with_multipletable.conf");
+    }
+
+    @TestTemplate
+    public void 
testLocalFileReadAndWriteInMultipleTableMode_parquet(TestContainer container)
+            throws IOException, InterruptedException {
+        TestHelper helper = new TestHelper(container);
+        
helper.execute("/parquet/local_file_parquet_to_assert_with_multipletable.conf");
+    }
+
+    @TestTemplate
+    public void 
testLocalFileReadAndWriteInMultipleTableMode_text(TestContainer container)
+            throws IOException, InterruptedException {
+        TestHelper helper = new TestHelper(container);
+        
helper.execute("/text/local_file_text_to_assert_with_multipletable.conf");
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_excel_to_assert_with_multipletable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_excel_to_assert_with_multipletable.conf
new file mode 100644
index 0000000000..b1c86c665d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/excel/local_excel_to_assert_with_multipletable.conf
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    tables_configs = [
+        {
+            path = "/seatunnel/read/excel"
+            file_format_type = excel
+            field_delimiter = ;
+            skip_header_row_number = 1
+            schema = {
+              table = "fake01"
+              fields {
+                c_map = "map<string, string>"
+                c_array = "array<int>"
+                c_string = string
+                c_boolean = boolean
+                c_tinyint = tinyint
+                c_smallint = smallint
+                c_int = int
+                c_bigint = bigint
+                c_float = float
+                c_double = double
+                c_bytes = bytes
+                c_date = date
+                c_decimal = "decimal(38, 18)"
+                c_timestamp = timestamp
+                c_row = {
+                  c_map = "map<string, string>"
+                  c_array = "array<int>"
+                  c_string = string
+                  c_boolean = boolean
+                  c_tinyint = tinyint
+                  c_smallint = smallint
+                  c_int = int
+                  c_bigint = bigint
+                  c_float = float
+                  c_double = double
+                  c_bytes = bytes
+                  c_date = date
+                  c_decimal = "decimal(38, 18)"
+                  c_timestamp = timestamp
+                }
+              }
+            }
+        },
+        {
+            path = "/seatunnel/read/excel"
+            file_format_type = excel
+            field_delimiter = ;
+            skip_header_row_number = 1
+            schema = {
+              table = "fake02"
+              fields {
+                c_map = "map<string, string>"
+                c_array = "array<int>"
+                c_string = string
+                c_boolean = boolean
+                c_tinyint = tinyint
+                c_smallint = smallint
+                c_int = int
+                c_bigint = bigint
+                c_float = float
+                c_double = double
+                c_bytes = bytes
+                c_date = date
+                c_decimal = "decimal(38, 18)"
+                c_timestamp = timestamp
+                c_row = {
+                  c_map = "map<string, string>"
+                  c_array = "array<int>"
+                  c_string = string
+                  c_boolean = boolean
+                  c_tinyint = tinyint
+                  c_smallint = smallint
+                  c_int = int
+                  c_bigint = bigint
+                  c_float = float
+                  c_double = double
+                  c_bytes = bytes
+                  c_date = date
+                  c_decimal = "decimal(38, 18)"
+                  c_timestamp = timestamp
+                }
+              }
+            }
+        }
+    ]
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      table-names = ["fake01", "fake02"]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert_with_multipletable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert_with_multipletable.conf
new file mode 100644
index 0000000000..0df06ad7ea
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert_with_multipletable.conf
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    tables_configs = [
+      {
+          path = "/seatunnel/read/json"
+          file_format_type = "json"
+          schema = {
+            table = "fake01"
+            fields {
+              c_map = "map<string, string>"
+              c_array = "array<int>"
+              c_string = string
+              c_boolean = boolean
+              c_tinyint = tinyint
+              c_smallint = smallint
+              c_int = int
+              c_bigint = bigint
+              c_float = float
+              c_double = double
+              c_bytes = bytes
+              c_date = date
+              c_decimal = "decimal(38, 18)"
+              c_timestamp = timestamp
+              c_row = {
+                C_MAP = "map<string, string>"
+                C_ARRAY = "array<int>"
+                C_STRING = string
+                C_BOOLEAN = boolean
+                C_TINYINT = tinyint
+                C_SMALLINT = smallint
+                C_INT = int
+                C_BIGINT = bigint
+                C_FLOAT = float
+                C_DOUBLE = double
+                C_BYTES = bytes
+                C_DATE = date
+                C_DECIMAL = "decimal(38, 18)"
+                C_TIMESTAMP = timestamp
+              }
+            }
+          }
+      },
+      {
+          path = "/seatunnel/read/json"
+          file_format_type = "json"
+          schema = {
+            table = "fake02"
+            fields {
+              c_map = "map<string, string>"
+              c_array = "array<int>"
+              c_string = string
+              c_boolean = boolean
+              c_tinyint = tinyint
+              c_smallint = smallint
+              c_int = int
+              c_bigint = bigint
+              c_float = float
+              c_double = double
+              c_bytes = bytes
+              c_date = date
+              c_decimal = "decimal(38, 18)"
+              c_timestamp = timestamp
+              c_row = {
+                C_MAP = "map<string, string>"
+                C_ARRAY = "array<int>"
+                C_STRING = string
+                C_BOOLEAN = boolean
+                C_TINYINT = tinyint
+                C_SMALLINT = smallint
+                C_INT = int
+                C_BIGINT = bigint
+                C_FLOAT = float
+                C_DOUBLE = double
+                C_BYTES = bytes
+                C_DATE = date
+                C_DECIMAL = "decimal(38, 18)"
+                C_TIMESTAMP = timestamp
+              }
+            }
+          }
+      }
+    ]
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      table-names = ["fake01", "fake02"]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
index 4595f83888..c04d448df9 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
@@ -29,6 +29,10 @@ source {
   LocalFile {
     path = "/tmp/fake_empty"
     file_format_type = "json"
+    # schema is needed for json type
+    schema {
+
+    }
   }
 }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert_with_multipletable.conf
similarity index 69%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert_with_multipletable.conf
index 4595f83888..a1df2c1d62 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert_with_multipletable.conf
@@ -27,11 +27,30 @@ env {
 
 source {
   LocalFile {
-    path = "/tmp/fake_empty"
-    file_format_type = "json"
+    tables_configs = [
+      {
+          schema = {
+              table = "fake01"
+          }
+          path = "/seatunnel/read/orc"
+          file_format_type = "orc"
+      },
+      {
+          schema = {
+              table = "fake02"
+          }
+          path = "/seatunnel/read/orc"
+          file_format_type = "orc"
+      }
+    ]
+    result_table_name = "fake"
   }
 }
 
 sink {
-  Console {}
+  Assert {
+    rules {
+        table-names = ["fake01", "fake02"]
+    }
+  }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert_with_multipletable.conf
similarity index 69%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert_with_multipletable.conf
index 4595f83888..a30c72447b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert_with_multipletable.conf
@@ -27,11 +27,30 @@ env {
 
 source {
   LocalFile {
-    path = "/tmp/fake_empty"
-    file_format_type = "json"
+    tables_configs = [
+      {
+          schema = {
+            table = "fake01"
+          }
+          path = "/seatunnel/read/parquet"
+          file_format_type = "parquet"
+      },
+      {
+          schema = {
+            table = "fake02"
+          }
+          path = "/seatunnel/read/parquet"
+          file_format_type = "parquet"
+      }
+    ]
+    result_table_name = "fake"
   }
 }
 
 sink {
-  Console {}
+  Assert {
+    rules {
+      table-names = ["fake01", "fake02"]
+    }
+  }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert_with_multipletable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert_with_multipletable.conf
new file mode 100644
index 0000000000..8e4e3db71b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert_with_multipletable.conf
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    tables_configs = [
+      {
+        path = "/seatunnel/read/text"
+        file_format_type = "text"
+        schema = {
+          table = "fake01"
+          fields {
+            c_map = "map<string, string>"
+            c_array = "array<int>"
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_bytes = bytes
+            c_date = date
+            c_decimal = "decimal(38, 18)"
+            c_timestamp = timestamp
+            c_row = {
+              c_map = "map<string, string>"
+              c_array = "array<int>"
+              c_string = string
+              c_boolean = boolean
+              c_tinyint = tinyint
+              c_smallint = smallint
+              c_int = int
+              c_bigint = bigint
+              c_float = float
+              c_double = double
+              c_bytes = bytes
+              c_date = date
+              c_decimal = "decimal(38, 18)"
+              c_timestamp = timestamp
+            }
+          }
+        }
+      },
+      {
+          path = "/seatunnel/read/text"
+          file_format_type = "text"
+          schema = {
+            table = "fake02"
+            fields {
+              c_map = "map<string, string>"
+              c_array = "array<int>"
+              c_string = string
+              c_boolean = boolean
+              c_tinyint = tinyint
+              c_smallint = smallint
+              c_int = int
+              c_bigint = bigint
+              c_float = float
+              c_double = double
+              c_bytes = bytes
+              c_date = date
+              c_decimal = "decimal(38, 18)"
+              c_timestamp = timestamp
+              c_row = {
+                c_map = "map<string, string>"
+                c_array = "array<int>"
+                c_string = string
+                c_boolean = boolean
+                c_tinyint = tinyint
+                c_smallint = smallint
+                c_int = int
+                c_bigint = bigint
+                c_float = float
+                c_double = double
+                c_bytes = bytes
+                c_date = date
+                c_decimal = "decimal(38, 18)"
+                c_timestamp = timestamp
+              }
+            }
+          }
+      }
+    ]
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      table-names = ["fake01", "fake02"]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
index 6ad7d6fd42..f32df5fbb7 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
@@ -20,51 +20,51 @@ env {
   job.mode = "BATCH"
 }
 
-    source {
-      FakeSource {
-        schema = {
-          fields {
-                    id = int
-                    val_bool = boolean
-                    val_int8 = tinyint
-                    val_int16 = smallint
-                    val_int32 = int
-                    val_int64 = bigint
-                    val_float = float
-                    val_double = double
-                    val_decimal = "decimal(16, 1)"
-                    val_string = string
-                    val_unixtime_micros = timestamp
-          }
-        }
-        rows = [
-          {
-            kind = INSERT
-            fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-          },
-          {
-            kind = INSERT
-            fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-          },
-          {
-            kind = INSERT
-            fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-          },
-          {
-            kind = UPDATE_BEFORE
-            fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-          },
-          {
-            kind = UPDATE_AFTER
-           fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-          },
-          {
-            kind = DELETE
-            fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-          }
-        ]
+source {
+  FakeSource {
+    schema = {
+      fields {
+                id = int
+                val_bool = boolean
+                val_int8 = tinyint
+                val_int16 = smallint
+                val_int32 = int
+                val_int64 = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+                val_unixtime_micros = timestamp
       }
     }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = INSERT
+        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_BEFORE
+        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = UPDATE_AFTER
+       fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      },
+      {
+        kind = DELETE
+        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
+      }
+    ]
+  }
+}
 
 sink {
    kudu{


Reply via email to