This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 717620f54 [Imprve][Connector-V2][Hive] Support read text table & 
Column projection (#4105)
717620f54 is described below

commit 717620f5423a329cee162c4cf9aa244ed1b33169
Author: Tyrantlucifer <[email protected]>
AuthorDate: Mon Feb 20 10:26:53 2023 +0800

    [Imprve][Connector-V2][Hive] Support read text table & Column projection 
(#4105)
    
    * [Imprve][Connector-V2][Hive] Support read text table
    
    * [Improve][Connector-V2][Hive] Format code with spotless
    
    * [Improve][Connector-V2][Hive] Fix unit test
    
    * [Improve][Connector-V2][Hive] Fix integration test
    
    * [Improve][Connector-V2][Hive] Optimize text format
    
    * [Improve][Connector-V2][Hive] Update e2e.txt
    
    * [Improve][Connector-V2][Hive] Optimize kerberos authentication
    
    * [Improve][Connector-V2][Hive] Improve config check logic for hive 
connectors
    
    * [Improve][Connector-V2][Hive] Support projection
    
    * [Improve][Connector-V2][Hive] Add transient keyword
    
    * [Improve][Connector-V2][Hive] Support parquet/orc column projection
    
    * [Improve][Connector-V2][Hive] Optimize code
    
    * [Improve][Connector-V2][Hive] Optimize code
    
    * [Improve][Connector-V2][Hive] Support text/json/csv column projection
    
    * [Improve][Connector-V2][Hive] Update docs
    
    * [Improve][Connector-V2][Hive] Update release-note
    
    * [Improve][Connector-V2][Hive] Add interface flag
    
    * [Improve][Connector-V2][Hive] Remove useless method
    
    * [Improve][Connector-V2][Hive] Update docs
    
    * [Improve][Connector-V2][Hive] Update docs
---
 .../connector-v2/Error-Quick-Reference-Manual.md   |   1 +
 docs/en/connector-v2/source/FtpFile.md             |  17 ++-
 docs/en/connector-v2/source/HdfsFile.md            |  17 ++-
 docs/en/connector-v2/source/Hive.md                |   5 +
 docs/en/connector-v2/source/LocalFile.md           |  15 +++
 docs/en/connector-v2/source/OssFile.md             |  17 ++-
 docs/en/connector-v2/source/OssJindoFile.md        |  15 +++
 docs/en/connector-v2/source/S3File.md              |  17 ++-
 docs/en/connector-v2/source/SftpFile.md            |  16 ++-
 release-note.md                                    |   2 +
 .../file/hdfs/source/BaseHdfsFileSource.java       |   6 +
 .../seatunnel/file/config/BaseSinkConfig.java      |   3 +-
 .../seatunnel/file/config/BaseSourceConfig.java    |   9 +-
 .../file/exception/FileConnectorErrorCode.java     |   3 +-
 .../seatunnel/file/sink/util/FileSystemUtils.java  |  34 +++++
 .../file/sink/writer/AbstractWriteStrategy.java    |  28 +---
 .../file/sink/writer/OrcWriteStrategy.java         |   2 +-
 .../file/sink/writer/ParquetWriteStrategy.java     |   2 +-
 .../seatunnel/file/source/BaseFileSource.java      |   4 +-
 .../file/source/reader/AbstractReadStrategy.java   |  38 ++----
 .../file/source/reader/OrcReadStrategy.java        |  35 +++--
 .../file/source/reader/ParquetReadStrategy.java    |  28 ++--
 .../file/source/reader/TextReadStrategy.java       | 114 ++++++++++------
 .../seatunnel/file/writer/OrcReadStrategyTest.java |  57 +++++++-
 .../file/writer/ParquetReadStrategyTest.java       |  90 ++++++++++++-
 .../src/test/resources/hive.parquet                | Bin 0 -> 3472 bytes
 .../src/test/resources/test_read_orc.conf          |  20 +++
 .../src/test/resources/test_read_parquet.conf      |  20 +++
 .../src/test/resources/test_read_parquet2.conf     |  20 +++
 .../src/test/resources/timestamp_as_int64.parquet  | Bin 0 -> 10297 bytes
 .../{test.parquet => timestamp_as_int96.parquet}   | Bin
 .../hive/commit/HiveSinkAggregatedCommitter.java   |   3 +
 .../connectors/seatunnel/hive/sink/HiveSink.java   |  36 +++++-
 .../seatunnel/hive/source/HiveSource.java          |  84 +++++++++++-
 .../seatunnel/hive/utils/HiveMetaStoreProxy.java   |  34 +----
 .../seatunnel/kafka/source/KafkaSource.java        |   3 +-
 .../e2e/connector/file/local/LocalFileIT.java      |  12 ++
 .../src/test/resources/orc/e2e.orc                 | Bin 5471 -> 5730 bytes
 ...nf => local_file_orc_projection_to_assert.conf} |  23 +---
 .../resources/orc/local_file_orc_to_assert.conf    |  22 +---
 .../local_file_parquet_projection_to_assert.conf}  |  19 +--
 .../src/test/resources/text/e2e.txt                |  10 +-
 .../local_file_text_projection_to_assert.conf}     |  39 +++++-
 .../format/text/TextDeserializationSchema.java     | 144 +++++++++++++--------
 .../format/text/TextSerializationSchema.java       | 114 ++++++++++++++--
 .../format/text/constant/TextFormatConstant.java   |  28 +---
 .../format/text/TextFormatSchemaTest.java          |  60 ++++++---
 47 files changed, 922 insertions(+), 344 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 01871328c..1c1fbb456 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -155,6 +155,7 @@ problems encountered by users.
 | FILE-01 | File type is invalid        | When users encounter this error 
code, it means that the this file is not the format that user assigned, please 
check it                                          |
 | FILE-02 | Data deserialization failed | When users encounter this error 
code, it means that data from files not satisfied the schema that user 
assigned, please check data from files whether is correct |
 | FILE-03 | Get file list failed        | When users encounter this error 
code, it means that connector try to traverse the path and get file list 
failed, please check file system whether is work        |
+| FILE-04 | File list is empty          | When users encounter this error 
code, it means that the path user want to sync is empty, please check file path 
                                                 |
 
 ## Doris Connector Error Codes
 
diff --git a/docs/en/connector-v2/source/FtpFile.md 
b/docs/en/connector-v2/source/FtpFile.md
index 0edf6cbc1..124fac7a0 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -19,7 +19,7 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 - [x] [batch](../../concept/connector-v2-features.md)
 - [ ] [stream](../../concept/connector-v2-features.md)
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
 - [x] file format
@@ -37,6 +37,7 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 | password                  | string  | yes      | -                   |
 | path                      | string  | yes      | -                   |
 | type                      | string  | yes      | -                   |
+| read_columns              | list    | no       | -                   |
 | delimiter                 | string  | no       | \001                |
 | parse_partition_from_path | boolean | no       | true                |
 | date_format               | string  | no       | yyyy-MM-dd          |
@@ -124,6 +125,20 @@ then Seatunnel will skip the first 2 lines from source 
files
 
 The schema information of upstream data.
 
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field 
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv` 
files, the schema option must be configured**
+
 ### type [string]
 
 File type, supported as the following file types:
diff --git a/docs/en/connector-v2/source/HdfsFile.md 
b/docs/en/connector-v2/source/HdfsFile.md
index e82831675..cde7be449 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -22,7 +22,7 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 
 Read all the data in a split in a pollNext call. What splits are read will be 
saved in snapshot.
 
-- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
 - [x] file format
@@ -39,6 +39,7 @@ Read all the data in a split in a pollNext call. What splits 
are read will be sa
 | path                      | string  | yes      | -                   |
 | type                      | string  | yes      | -                   |
 | fs.defaultFS              | string  | yes      | -                   |
+| read_columns              | list    | yes      | -                   |
 | hdfs_site_path            | string  | no       | -                   |
 | delimiter                 | string  | no       | \001                |
 | parse_partition_from_path | boolean | no       | true                |
@@ -219,6 +220,20 @@ The keytab path of kerberos
 
 the schema fields of upstream data
 
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field 
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv` 
files, the schema option must be configured**
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
diff --git a/docs/en/connector-v2/source/Hive.md 
b/docs/en/connector-v2/source/Hive.md
index fc0023539..413473c7f 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -41,6 +41,7 @@ Read all the data in a split in a pollNext call. What splits 
are read will be sa
 | kerberos_keytab_path | string | no       | -             |
 | hdfs_site_path       | string | no       | -             |
 | read_partitions      | list   | no       | -             |
+| read_columns         | list   | no       | -             |
 | common-options       |        | no       | -             |
 
 ### table_name [string]
@@ -70,6 +71,10 @@ The principal of kerberos authentication
 
 The keytab file path of kerberos authentication
 
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field 
projection.
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details
diff --git a/docs/en/connector-v2/source/LocalFile.md 
b/docs/en/connector-v2/source/LocalFile.md
index a2f15805f..6a74d18ab 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -38,6 +38,7 @@ Read all the data in a split in a pollNext call. What splits 
are read will be sa
 |---------------------------|---------|----------|---------------------|
 | path                      | string  | yes      | -                   |
 | type                      | string  | yes      | -                   |
+| read_columns              | list    | no       | -                   |
 | delimiter                 | string  | no       | \001                |
 | parse_partition_from_path | boolean | no       | true                |
 | date_format               | string  | no       | yyyy-MM-dd          |
@@ -199,6 +200,20 @@ connector will generate data as the following:
 
 The schema information of upstream data.
 
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field 
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv` 
files, the schema option must be configured**
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details
diff --git a/docs/en/connector-v2/source/OssFile.md 
b/docs/en/connector-v2/source/OssFile.md
index a822fa11a..0ca6eeb50 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -25,7 +25,7 @@ It only supports hadoop version **2.9.X+**.
 
 Read all the data in a split in a pollNext call. What splits are read will be 
saved in snapshot.
 
-- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
 - [x] file format
@@ -45,6 +45,7 @@ Read all the data in a split in a pollNext call. What splits 
are read will be sa
 | access_key                | string  | yes      | -                   |
 | access_secret             | string  | yes      | -                   |
 | endpoint                  | string  | yes      | -                   |
+| read_columns              | list    | yes      | -                   |
 | delimiter                 | string  | no       | \001                |
 | parse_partition_from_path | boolean | no       | true                |
 | skip_header_row_number    | long    | no       | 0                   |
@@ -222,6 +223,20 @@ The endpoint of oss file system.
 
 The schema of upstream data.
 
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field 
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv` 
files, the schema option must be configured**
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
diff --git a/docs/en/connector-v2/source/OssJindoFile.md 
b/docs/en/connector-v2/source/OssJindoFile.md
index 9c966fe39..fef93da5b 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -45,6 +45,7 @@ Read all the data in a split in a pollNext call. What splits 
are read will be sa
 | access_key                | string  | yes      | -                   |
 | access_secret             | string  | yes      | -                   |
 | endpoint                  | string  | yes      | -                   |
+| read_columns              | list    | no       | -                   |
 | delimiter                 | string  | no       | \001                |
 | parse_partition_from_path | boolean | no       | true                |
 | date_format               | string  | no       | yyyy-MM-dd          |
@@ -222,6 +223,20 @@ The endpoint of oss file system.
 
 The schema of upstream data.
 
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field 
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv` 
files, the schema option must be configured**
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
diff --git a/docs/en/connector-v2/source/S3File.md 
b/docs/en/connector-v2/source/S3File.md
index 27fe779a6..65611e770 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -24,7 +24,7 @@ To use this connector you need put hadoop-aws-3.1.4.jar and 
aws-java-sdk-bundle-
 
 Read all the data in a split in a pollNext call. What splits are read will be 
saved in snapshot.
 
-- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
 - [x] file format
@@ -43,6 +43,7 @@ Read all the data in a split in a pollNext call. What splits 
are read will be sa
 | bucket                          | string  | yes      | -                     
                                |
 | fs.s3a.endpoint                 | string  | yes      | -                     
                                |
 | fs.s3a.aws.credentials.provider | string  | yes      | 
com.amazonaws.auth.InstanceProfileCredentialsProvider |
+| read_columns                    | list    | no       | -                     
                                |
 | access_key                      | string  | no       | -                     
                                |
 | access_secret                   | string  | no       | -                     
                                |
 | hadoop_s3_properties            | map     | no       | -                     
                                |
@@ -239,6 +240,20 @@ hadoop_s3_properties {
 
 The schema of upstream data.
 
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field 
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv` 
files, the schema option must be configured**
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
diff --git a/docs/en/connector-v2/source/SftpFile.md 
b/docs/en/connector-v2/source/SftpFile.md
index 80d77deaf..1563874ae 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -19,7 +19,7 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 - [x] [batch](../../concept/connector-v2-features.md)
 - [ ] [stream](../../concept/connector-v2-features.md)
 - [ ] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
 - [x] file format
@@ -124,6 +124,20 @@ then Seatunnel will skip the first 2 lines from source 
files
 
 The schema information of upstream data.
 
+### read_columns [list]
+
+The read column list of the data source, user can use it to implement field 
projection.
+
+The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+
+**Tips: If the user wants to use this feature when reading `text` `json` `csv` 
files, the schema option must be configured**
+
 ### type [string]
 
 File type, supported as the following file types:
diff --git a/release-note.md b/release-note.md
index d0a337566..0ed18a641 100644
--- a/release-note.md
+++ b/release-note.md
@@ -38,6 +38,8 @@
 - [API]Add parallelism and column projection interface #3829
 - [API]Add get source method to all source connector #3846
 - [Hive] Support read user-defined partitions #3842
+- [Hive] Support read text table & Column projection #4105
+- [File] Support column projection #4105
 ### Zeta Engine
 - [Chore] Remove unnecessary dependencies #3795
 - [Core] Improve job restart of all node down #3784
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index c78c3fdba..70f159afe 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -78,6 +78,12 @@ public abstract class BaseHdfsFileSource extends 
BaseFileSource {
             throw new FileConnectorException(
                     FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
         }
+        if (filePaths.isEmpty()) {
+            throw new FileConnectorException(
+                    FileConnectorErrorCode.FILE_LIST_EMPTY,
+                    "The target file list is empty,"
+                            + "SeaTunnel will not be able to sync empty 
table");
+        }
         // support user-defined schema
         FileFormat fileFormat =
                 FileFormat.valueOf(
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 2533b10f1..43403f632 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
 
 import java.util.Arrays;
 import java.util.List;
@@ -31,7 +32,7 @@ public class BaseSinkConfig {
     public static final String NON_PARTITION = "NON_PARTITION";
     public static final String TRANSACTION_ID_SPLIT = "_";
     public static final String TRANSACTION_EXPRESSION = "transactionId";
-    public static final String DEFAULT_FIELD_DELIMITER = 
String.valueOf('\001');
+    public static final String DEFAULT_FIELD_DELIMITER = 
TextFormatConstant.SEPARATOR[0];
     public static final String DEFAULT_ROW_DELIMITER = "\n";
     public static final String DEFAULT_PARTITION_DIR_EXPRESSION =
             "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/";
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index 5b8ea6c16..747b972aa 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
 
 import java.util.List;
 
@@ -41,7 +42,7 @@ public class BaseSourceConfig {
     public static final Option<String> DELIMITER =
             Options.key("delimiter")
                     .stringType()
-                    .defaultValue(String.valueOf('\001'))
+                    .defaultValue(TextFormatConstant.SEPARATOR[0])
                     .withDescription(
                             "The separator between columns in a row of data. 
Only needed by `text` file format");
 
@@ -98,4 +99,10 @@ public class BaseSourceConfig {
                     .listType()
                     .noDefaultValue()
                     .withDescription("The partitions that the user want to 
read");
+
+    public static final Option<List<String>> READ_COLUMNS =
+            Options.key("read_columns")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription("The columns list that the user want to 
read");
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
index e966ebcd5..65e9590f3 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
@@ -22,7 +22,8 @@ import 
org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
 public enum FileConnectorErrorCode implements SeaTunnelErrorCode {
     FILE_TYPE_INVALID("FILE-01", "File type is invalid"),
     DATA_DESERIALIZE_FAILED("FILE-02", "Data deserialization failed"),
-    FILE_LIST_GET_FAILED("FILE-03", "Get file list failed");
+    FILE_LIST_GET_FAILED("FILE-03", "Get file list failed"),
+    FILE_LIST_EMPTY("FILE-04", "File list is empty");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index 15b05796e..b0993f17a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -21,12 +21,14 @@ import 
org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
@@ -49,12 +51,44 @@ public class FileSystemUtils implements Serializable {
         this.hadoopConf = hadoopConf;
     }
 
+    public static void doKerberosAuthentication(
+            Configuration configuration, String principal, String keytabPath) {
+        if (StringUtils.isBlank(principal) || StringUtils.isBlank(keytabPath)) 
{
+            log.warn(
+                    "Principal [{}] or keytabPath [{}] is empty, it will skip 
kerberos authentication",
+                    principal,
+                    keytabPath);
+        } else {
+            configuration.set("hadoop.security.authentication", "kerberos");
+            UserGroupInformation.setConfiguration(configuration);
+            try {
+                log.info(
+                        "Start Kerberos authentication using principal {} and 
keytab {}",
+                        principal,
+                        keytabPath);
+                UserGroupInformation.loginUserFromKeytab(principal, 
keytabPath);
+                log.info("Kerberos authentication successful");
+            } catch (IOException e) {
+                String errorMsg =
+                        String.format(
+                                "Kerberos authentication failed using this "
+                                        + "principal [%s] and keytab path 
[%s]",
+                                principal, keytabPath);
+                throw new FileConnectorException(
+                        CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, 
e);
+            }
+        }
+    }
+
     public Configuration getConfiguration(HadoopConf hadoopConf) {
         Configuration configuration = new Configuration();
         configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, 
hadoopConf.getHdfsNameKey());
         configuration.set(
                 String.format("fs.%s.impl", hadoopConf.getSchema()), 
hadoopConf.getFsHdfsImpl());
         hadoopConf.setExtraOptionsForConfiguration(configuration);
+        String principal = hadoopConf.getKerberosPrincipal();
+        String keytabPath = hadoopConf.getKerberosKeytabPath();
+        doKerberosAuthentication(configuration, principal, keytabPath);
         return configuration;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index d0078f18b..6820d28d8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -37,7 +37,6 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -154,31 +153,8 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
         this.hadoopConf.setExtraOptionsForConfiguration(configuration);
         String principal = hadoopConf.getKerberosPrincipal();
         String keytabPath = hadoopConf.getKerberosKeytabPath();
-        if (!isKerberosAuthorization && StringUtils.isNotBlank(principal)) {
-            // kerberos authentication and only once
-            if (StringUtils.isBlank(keytabPath)) {
-                throw new FileConnectorException(
-                        CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
-                        "Kerberos keytab path is blank, please check this 
parameter that in your config file");
-            }
-            configuration.set("hadoop.security.authentication", "kerberos");
-            UserGroupInformation.setConfiguration(configuration);
-            try {
-                log.info(
-                        "Start Kerberos authentication using principal {} and 
keytab {}",
-                        principal,
-                        keytabPath);
-                UserGroupInformation.loginUserFromKeytab(principal, 
keytabPath);
-                log.info("Kerberos authentication successful");
-            } catch (IOException e) {
-                String errorMsg =
-                        String.format(
-                                "Kerberos authentication failed using this "
-                                        + "principal [%s] and keytab path 
[%s]",
-                                principal, keytabPath);
-                throw new FileConnectorException(
-                        CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, 
e);
-            }
+        if (!isKerberosAuthorization) {
+            FileSystemUtils.doKerberosAuthentication(configuration, principal, 
keytabPath);
             isKerberosAuthorization = true;
         }
         return configuration;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 75c9ae34b..551d02f5b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -135,7 +135,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy 
{
         return writer;
     }
 
-    private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
+    public static TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> 
type) {
         switch (type.getSqlType()) {
             case ARRAY:
                 BasicType<?> elementType = ((ArrayType<?, ?>) 
type).getElementType();
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index 17966c9d1..ce104da80 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -224,7 +224,7 @@ public class ParquetWriteStrategy extends 
AbstractWriteStrategy {
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
-    private Type seaTunnelDataType2ParquetDataType(
+    public static Type seaTunnelDataType2ParquetDataType(
             String fieldName, SeaTunnelDataType<?> seaTunnelDataType) {
         switch (seaTunnelDataType.getSqlType()) {
             case ARRAY:
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
index d0596017f..c6034f06b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
 import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -35,7 +36,8 @@ import java.util.List;
 
 public abstract class BaseFileSource
         implements SeaTunnelSource<SeaTunnelRow, FileSourceSplit, 
FileSourceState>,
-                SupportParallelism {
+                SupportParallelism,
+                SupportColumnProjection {
     protected SeaTunnelRowType rowType;
     protected ReadStrategy readStrategy;
     protected HadoopConf hadoopConf;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 35c14670c..0bd638a42 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -22,18 +22,15 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -72,6 +69,7 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
     protected Config pluginConfig;
     protected List<String> fileNames = new ArrayList<>();
     protected List<String> readPartitions = new ArrayList<>();
+    protected List<String> readColumns = new ArrayList<>();
     protected boolean isMergePartition = true;
     protected long skipHeaderNumber = 
BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue();
     protected boolean isKerberosAuthorization = false;
@@ -94,38 +92,15 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
         configuration.setBoolean(READ_INT96_AS_FIXED, true);
         configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
         configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
-        configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
+        configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
         configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, 
hadoopConf.getHdfsNameKey());
         configuration.set(
                 String.format("fs.%s.impl", hadoopConf.getSchema()), 
hadoopConf.getFsHdfsImpl());
         hadoopConf.setExtraOptionsForConfiguration(configuration);
         String principal = hadoopConf.getKerberosPrincipal();
         String keytabPath = hadoopConf.getKerberosKeytabPath();
-        if (!isKerberosAuthorization && StringUtils.isNotBlank(principal)) {
-            // kerberos authentication and only once
-            if (StringUtils.isBlank(keytabPath)) {
-                throw new FileConnectorException(
-                        CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
-                        "Kerberos keytab path is blank, please check this 
parameter that in your config file");
-            }
-            configuration.set("hadoop.security.authentication", "kerberos");
-            UserGroupInformation.setConfiguration(configuration);
-            try {
-                log.info(
-                        "Start Kerberos authentication using principal {} and 
keytab {}",
-                        principal,
-                        keytabPath);
-                UserGroupInformation.loginUserFromKeytab(principal, 
keytabPath);
-                log.info("Kerberos authentication successful");
-            } catch (IOException e) {
-                String errorMsg =
-                        String.format(
-                                "Kerberos authentication failed using this "
-                                        + "principal [%s] and keytab path 
[%s]",
-                                principal, keytabPath);
-                throw new FileConnectorException(
-                        CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, 
e);
-            }
+        if (!isKerberosAuthorization) {
+            FileSystemUtils.doKerberosAuthentication(configuration, principal, 
keytabPath);
             isKerberosAuthorization = true;
         }
         return configuration;
@@ -187,6 +162,9 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
             readPartitions.addAll(
                     
pluginConfig.getStringList(BaseSourceConfig.READ_PARTITIONS.key()));
         }
+        if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) {
+            
readColumns.addAll(pluginConfig.getStringList(BaseSourceConfig.READ_COLUMNS.key()));
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index d23d4222f..d191c3a83 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -67,6 +67,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy.buildFieldWithRowType;
+
 @Slf4j
 public class OrcReadStrategy extends AbstractReadStrategy {
     private static final long MIN_SIZE = 16 * 1024;
@@ -86,10 +88,15 @@ public class OrcReadStrategy extends AbstractReadStrategy {
         Map<String, String> partitionsMap = parsePartitionsByPath(path);
         OrcFile.ReaderOptions readerOptions = 
OrcFile.readerOptions(configuration);
         try (Reader reader = OrcFile.createReader(filePath, readerOptions)) {
-            TypeDescription schema = reader.getSchema();
+            TypeDescription schema = TypeDescription.createStruct();
+            for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) {
+                TypeDescription typeDescription =
+                        
buildFieldWithRowType(seaTunnelRowType.getFieldType(i));
+                schema.addField(seaTunnelRowType.getFieldName(i), 
typeDescription);
+            }
             List<TypeDescription> children = schema.getChildren();
-            RecordReader rows = reader.rows();
-            VectorizedRowBatch rowBatch = reader.getSchema().createRowBatch();
+            RecordReader rows = reader.rows(reader.options().schema(schema));
+            VectorizedRowBatch rowBatch = schema.createRowBatch();
             while (rows.nextBatch(rowBatch)) {
                 int num = 0;
                 for (int i = 0; i < rowBatch.size; i++) {
@@ -128,11 +135,23 @@ public class OrcReadStrategy extends AbstractReadStrategy 
{
         Path dstDir = new Path(path);
         try (Reader reader = OrcFile.createReader(dstDir, readerOptions)) {
             TypeDescription schema = reader.getSchema();
-            String[] fields = new String[schema.getFieldNames().size()];
-            SeaTunnelDataType<?>[] types = new 
SeaTunnelDataType[schema.getFieldNames().size()];
-            for (int i = 0; i < schema.getFieldNames().size(); i++) {
-                fields[i] = schema.getFieldNames().get(i);
-                types[i] = 
orcDataType2SeaTunnelDataType(schema.getChildren().get(i));
+            List<String> fieldNames = schema.getFieldNames();
+            if (readColumns.isEmpty()) {
+                readColumns.addAll(fieldNames);
+            }
+            String[] fields = new String[readColumns.size()];
+            SeaTunnelDataType<?>[] types = new 
SeaTunnelDataType[readColumns.size()];
+            for (int i = 0; i < readColumns.size(); i++) {
+                fields[i] = readColumns.get(i);
+                int index = fieldNames.indexOf(readColumns.get(i));
+                if (index == -1) {
+                    throw new FileConnectorException(
+                            CommonErrorCode.TABLE_SCHEMA_GET_FAILED,
+                            String.format(
+                                    "Column [%s] does not exists in table 
schema [%s]",
+                                    readColumns.get(i), String.join(",", 
fieldNames)));
+                }
+                types[i] = 
orcDataType2SeaTunnelDataType(schema.getChildren().get(index));
             }
             seaTunnelRowType = new SeaTunnelRowType(fields, types);
             seaTunnelRowTypeWithPartition = mergePartitionTypes(path, 
seaTunnelRowType);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 7762cbfa9..424e181f8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -78,6 +78,8 @@ public class ParquetReadStrategy extends AbstractReadStrategy 
{
     private static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1L);
     private static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
 
+    private int[] indexes;
+
     @Override
     public void read(String path, Collector<SeaTunnelRow> output)
             throws FileConnectorException, IOException {
@@ -113,7 +115,7 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                     fields = new Object[fieldsCount];
                 }
                 for (int i = 0; i < fieldsCount; i++) {
-                    Object data = record.get(i);
+                    Object data = record.get(indexes[i]);
                     fields[i] = resolveObject(data, 
seaTunnelRowType.getFieldType(i));
                 }
                 SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
@@ -236,15 +238,21 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
             throw new 
FileConnectorException(CommonErrorCode.READER_OPERATION_FAILED, errorMsg, e);
         }
         FileMetaData fileMetaData = metadata.getFileMetaData();
-        MessageType schema = fileMetaData.getSchema();
-        int fieldCount = schema.getFieldCount();
-        String[] fields = new String[fieldCount];
-        SeaTunnelDataType<?>[] types = new SeaTunnelDataType[fieldCount];
-        for (int i = 0; i < fieldCount; i++) {
-            fields[i] = schema.getFieldName(i);
-            Type type = schema.getType(i);
-            SeaTunnelDataType<?> fieldType = parquetType2SeaTunnelType(type);
-            types[i] = fieldType;
+        MessageType originalSchema = fileMetaData.getSchema();
+        if (readColumns.isEmpty()) {
+            for (int i = 0; i < originalSchema.getFieldCount(); i++) {
+                readColumns.add(originalSchema.getFieldName(i));
+            }
+        }
+        String[] fields = new String[readColumns.size()];
+        SeaTunnelDataType<?>[] types = new 
SeaTunnelDataType[readColumns.size()];
+        indexes = new int[readColumns.size()];
+        for (int i = 0; i < readColumns.size(); i++) {
+            fields[i] = readColumns.get(i);
+            Type type = originalSchema.getType(fields[i]);
+            int fieldIndex = originalSchema.getFieldIndex(fields[i]);
+            indexes[i] = fieldIndex;
+            types[i] = parquetType2SeaTunnelType(type);
         }
         seaTunnelRowType = new SeaTunnelRowType(fields, types);
         seaTunnelRowTypeWithPartition = mergePartitionTypes(path, 
seaTunnelRowType);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index ae57d33dc..0ea1d7c41 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -17,8 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
@@ -31,6 +33,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.format.text.TextDeserializationSchema;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -44,10 +47,12 @@ import java.util.Map;
 
 public class TextReadStrategy extends AbstractReadStrategy {
     private DeserializationSchema<SeaTunnelRow> deserializationSchema;
-    private String fieldDelimiter = String.valueOf('\001');
-    private DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
-    private DateTimeUtils.Formatter datetimeFormat = 
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
-    private TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
+    private String fieldDelimiter = BaseSourceConfig.DELIMITER.defaultValue();
+    private DateUtils.Formatter dateFormat = 
BaseSourceConfig.DATE_FORMAT.defaultValue();
+    private DateTimeUtils.Formatter datetimeFormat =
+            BaseSourceConfig.DATETIME_FORMAT.defaultValue();
+    private TimeUtils.Formatter timeFormat = 
BaseSourceConfig.TIME_FORMAT.defaultValue();
+    private int[] indexes;
 
     @Override
     public void read(String path, Collector<SeaTunnelRow> output)
@@ -66,6 +71,22 @@ public class TextReadStrategy extends AbstractReadStrategy {
                                 try {
                                     SeaTunnelRow seaTunnelRow =
                                             
deserializationSchema.deserialize(line.getBytes());
+                                    if (!readColumns.isEmpty()) {
+                                        // need column projection
+                                        Object[] fields;
+                                        if (isMergePartition) {
+                                            fields =
+                                                    new Object
+                                                            [readColumns.size()
+                                                                    + 
partitionsMap.size()];
+                                        } else {
+                                            fields = new 
Object[readColumns.size()];
+                                        }
+                                        for (int i = 0; i < indexes.length; 
i++) {
+                                            fields[i] = 
seaTunnelRow.getField(indexes[i]);
+                                        }
+                                        seaTunnelRow = new 
SeaTunnelRow(fields);
+                                    }
                                     if (isMergePartition) {
                                         int index = 
seaTunnelRowType.getTotalFields();
                                         for (String value : 
partitionsMap.values()) {
@@ -89,29 +110,35 @@ public class TextReadStrategy extends AbstractReadStrategy 
{
 
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, 
String path) {
-        SeaTunnelRowType simpleSeaTunnelType = 
SeaTunnelSchema.buildSimpleTextSchema();
-        this.seaTunnelRowType = simpleSeaTunnelType;
+        this.seaTunnelRowType = SeaTunnelSchema.buildSimpleTextSchema();
         this.seaTunnelRowTypeWithPartition =
-                mergePartitionTypes(fileNames.get(0), simpleSeaTunnelType);
+                mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
+        initFormatter();
+        if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    "When reading json/text/csv files, if user has not 
specified schema information, "
+                            + "SeaTunnel will not support column projection");
+        }
+        TextDeserializationSchema.Builder builder =
+                TextDeserializationSchema.builder()
+                        .delimiter(TextFormatConstant.PLACEHOLDER)
+                        .dateFormatter(dateFormat)
+                        .dateTimeFormatter(datetimeFormat)
+                        .timeFormatter(timeFormat);
         if (isMergePartition) {
             deserializationSchema =
-                    TextDeserializationSchema.builder()
-                            
.seaTunnelRowType(this.seaTunnelRowTypeWithPartition)
-                            .delimiter(String.valueOf('\002'))
-                            .build();
+                    
builder.seaTunnelRowType(this.seaTunnelRowTypeWithPartition).build();
         } else {
-            deserializationSchema =
-                    TextDeserializationSchema.builder()
-                            .seaTunnelRowType(this.seaTunnelRowType)
-                            .delimiter(String.valueOf('\002'))
-                            .build();
+            deserializationSchema = 
builder.seaTunnelRowType(this.seaTunnelRowType).build();
         }
         return getActualSeaTunnelRowTypeInfo();
     }
 
     @Override
     public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+        SeaTunnelRowType userDefinedRowTypeWithPartition =
+                mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
         if (pluginConfig.hasPath(BaseSourceConfig.DELIMITER.key())) {
             fieldDelimiter = 
pluginConfig.getString(BaseSourceConfig.DELIMITER.key());
         } else {
@@ -122,6 +149,40 @@ public class TextReadStrategy extends AbstractReadStrategy 
{
                 fieldDelimiter = ",";
             }
         }
+        initFormatter();
+        TextDeserializationSchema.Builder builder =
+                TextDeserializationSchema.builder()
+                        .delimiter(fieldDelimiter)
+                        .dateFormatter(dateFormat)
+                        .dateTimeFormatter(datetimeFormat)
+                        .timeFormatter(timeFormat);
+        if (isMergePartition) {
+            deserializationSchema =
+                    
builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();
+        } else {
+            deserializationSchema = 
builder.seaTunnelRowType(seaTunnelRowType).build();
+        }
+        // column projection
+        if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) {
+            // get the read column index from user-defined row type
+            indexes = new int[readColumns.size()];
+            String[] fields = new String[readColumns.size()];
+            SeaTunnelDataType<?>[] types = new 
SeaTunnelDataType[readColumns.size()];
+            for (int i = 0; i < indexes.length; i++) {
+                indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i));
+                fields[i] = seaTunnelRowType.getFieldName(indexes[i]);
+                types[i] = seaTunnelRowType.getFieldType(indexes[i]);
+            }
+            this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
+            this.seaTunnelRowTypeWithPartition =
+                    mergePartitionTypes(fileNames.get(0), 
this.seaTunnelRowType);
+        } else {
+            this.seaTunnelRowType = seaTunnelRowType;
+            this.seaTunnelRowTypeWithPartition = 
userDefinedRowTypeWithPartition;
+        }
+    }
+
+    private void initFormatter() {
         if (pluginConfig.hasPath(BaseSourceConfig.DATE_FORMAT.key())) {
             dateFormat =
                     DateUtils.Formatter.parse(
@@ -137,24 +198,5 @@ public class TextReadStrategy extends AbstractReadStrategy 
{
                     TimeUtils.Formatter.parse(
                             
pluginConfig.getString(BaseSourceConfig.TIME_FORMAT.key()));
         }
-        if (isMergePartition) {
-            deserializationSchema =
-                    TextDeserializationSchema.builder()
-                            
.seaTunnelRowType(this.seaTunnelRowTypeWithPartition)
-                            .delimiter(fieldDelimiter)
-                            .dateFormatter(dateFormat)
-                            .dateTimeFormatter(datetimeFormat)
-                            .timeFormatter(timeFormat)
-                            .build();
-        } else {
-            deserializationSchema =
-                    TextDeserializationSchema.builder()
-                            .seaTunnelRowType(this.seaTunnelRowType)
-                            .delimiter(fieldDelimiter)
-                            .dateFormatter(dateFormat)
-                            .dateTimeFormatter(datetimeFormat)
-                            .timeFormatter(timeFormat)
-                            .build();
-        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
index 26f4b267a..5d72ae2e0 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
@@ -17,16 +17,23 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.writer;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.io.File;
 import java.net.URL;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
 
@@ -34,23 +41,63 @@ public class OrcReadStrategyTest {
 
     @Test
     public void testOrcRead() throws Exception {
-        URL resource = OrcReadStrategyTest.class.getResource("/test.orc");
-        assert resource != null;
-        String path = Paths.get(resource.toURI()).toString();
+        URL orcFile = OrcReadStrategyTest.class.getResource("/test.orc");
+        Assertions.assertNotNull(orcFile);
+        String orcFilePath = Paths.get(orcFile.toURI()).toString();
         OrcReadStrategy orcReadStrategy = new OrcReadStrategy();
         LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
         orcReadStrategy.init(localConf);
         TestCollector testCollector = new TestCollector();
-        orcReadStrategy.read(path, testCollector);
+        SeaTunnelRowType seaTunnelRowTypeInfo =
+                orcReadStrategy.getSeaTunnelRowTypeInfo(localConf, 
orcFilePath);
+        Assertions.assertNotNull(seaTunnelRowTypeInfo);
+        System.out.println(seaTunnelRowTypeInfo);
+        orcReadStrategy.read(orcFilePath, testCollector);
+        for (SeaTunnelRow row : testCollector.getRows()) {
+            Assertions.assertEquals(row.getField(0).getClass(), Boolean.class);
+            Assertions.assertEquals(row.getField(1).getClass(), Byte.class);
+            Assertions.assertEquals(row.getField(16).getClass(), 
SeaTunnelRow.class);
+        }
+    }
+
+    @Test
+    public void testOrcReadProjection() throws Exception {
+        URL orcFile = OrcReadStrategyTest.class.getResource("/test.orc");
+        URL conf = 
OrcReadStrategyTest.class.getResource("/test_read_orc.conf");
+        Assertions.assertNotNull(orcFile);
+        Assertions.assertNotNull(conf);
+        String orcFilePath = Paths.get(orcFile.toURI()).toString();
+        String confPath = Paths.get(conf.toURI()).toString();
+        OrcReadStrategy orcReadStrategy = new OrcReadStrategy();
+        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
+        orcReadStrategy.init(localConf);
+        orcReadStrategy.setPluginConfig(pluginConfig);
+        TestCollector testCollector = new TestCollector();
+        SeaTunnelRowType seaTunnelRowTypeInfo =
+                orcReadStrategy.getSeaTunnelRowTypeInfo(localConf, 
orcFilePath);
+        Assertions.assertNotNull(seaTunnelRowTypeInfo);
+        System.out.println(seaTunnelRowTypeInfo);
+        orcReadStrategy.read(orcFilePath, testCollector);
+        for (SeaTunnelRow row : testCollector.getRows()) {
+            Assertions.assertEquals(row.getField(0).getClass(), Byte.class);
+            Assertions.assertEquals(row.getField(1).getClass(), Boolean.class);
+        }
     }
 
     public static class TestCollector implements Collector<SeaTunnelRow> {
 
+        private final List<SeaTunnelRow> rows = new ArrayList<>();
+
+        public List<SeaTunnelRow> getRows() {
+            return rows;
+        }
+
         @SuppressWarnings("checkstyle:RegexpSingleline")
         @Override
         public void collect(SeaTunnelRow record) {
             System.out.println(record);
-            Assertions.assertEquals(record.getField(16).getClass(), 
SeaTunnelRow.class);
+            rows.add(record);
         }
 
         @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index 5775c3659..de1d8d932 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -17,41 +17,123 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.writer;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.io.File;
 import java.net.URL;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
 
 public class ParquetReadStrategyTest {
     @Test
-    public void testParquetRead() throws Exception {
-        URL resource = 
ParquetReadStrategyTest.class.getResource("/test.parquet");
-        assert resource != null;
+    public void testParquetRead1() throws Exception {
+        URL resource = 
ParquetReadStrategyTest.class.getResource("/timestamp_as_int64.parquet");
+        Assertions.assertNotNull(resource);
+        String path = Paths.get(resource.toURI()).toString();
+        ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        parquetReadStrategy.init(localConf);
+        SeaTunnelRowType seaTunnelRowTypeInfo =
+                parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path);
+        Assertions.assertNotNull(seaTunnelRowTypeInfo);
+        System.out.println(seaTunnelRowTypeInfo);
+        TestCollector testCollector = new TestCollector();
+        parquetReadStrategy.read(path, testCollector);
+    }
+
+    @Test
+    public void testParquetRead2() throws Exception {
+        URL resource = 
ParquetReadStrategyTest.class.getResource("/hive.parquet");
+        Assertions.assertNotNull(resource);
+        String path = Paths.get(resource.toURI()).toString();
+        ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        parquetReadStrategy.init(localConf);
+        SeaTunnelRowType seaTunnelRowTypeInfo =
+                parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path);
+        Assertions.assertNotNull(seaTunnelRowTypeInfo);
+        System.out.println(seaTunnelRowTypeInfo);
+        TestCollector testCollector = new TestCollector();
+        parquetReadStrategy.read(path, testCollector);
+    }
+
+    @Test
+    public void testParquetReadProjection1() throws Exception {
+        URL resource = 
ParquetReadStrategyTest.class.getResource("/timestamp_as_int96.parquet");
+        URL conf = 
OrcReadStrategyTest.class.getResource("/test_read_parquet.conf");
+        Assertions.assertNotNull(resource);
+        Assertions.assertNotNull(conf);
         String path = Paths.get(resource.toURI()).toString();
+        String confPath = Paths.get(conf.toURI()).toString();
+        Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
         ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
         LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
         parquetReadStrategy.init(localConf);
+        parquetReadStrategy.setPluginConfig(pluginConfig);
         SeaTunnelRowType seaTunnelRowTypeInfo =
                 parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path);
-        assert seaTunnelRowTypeInfo != null;
+        Assertions.assertNotNull(seaTunnelRowTypeInfo);
+        System.out.println(seaTunnelRowTypeInfo);
+        TestCollector testCollector = new TestCollector();
+        parquetReadStrategy.read(path, testCollector);
+        List<SeaTunnelRow> rows = testCollector.getRows();
+        for (SeaTunnelRow row : rows) {
+            Assertions.assertEquals(row.getField(0).getClass(), Long.class);
+            Assertions.assertEquals(row.getField(1).getClass(), Byte.class);
+            Assertions.assertEquals(row.getField(2).getClass(), Short.class);
+            Assertions.assertEquals(row.getField(0), 40000000000L);
+            Assertions.assertEquals(row.getField(1), (byte) 1);
+            Assertions.assertEquals(row.getField(2), (short) 1);
+        }
+    }
+
+    @Test
+    public void testParquetReadProjection2() throws Exception {
+        URL resource = 
ParquetReadStrategyTest.class.getResource("/hive.parquet");
+        URL conf = 
OrcReadStrategyTest.class.getResource("/test_read_parquet2.conf");
+        Assertions.assertNotNull(resource);
+        Assertions.assertNotNull(conf);
+        String path = Paths.get(resource.toURI()).toString();
+        String confPath = Paths.get(conf.toURI()).toString();
+        Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
+        ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        parquetReadStrategy.init(localConf);
+        parquetReadStrategy.setPluginConfig(pluginConfig);
+        SeaTunnelRowType seaTunnelRowTypeInfo =
+                parquetReadStrategy.getSeaTunnelRowTypeInfo(localConf, path);
+        Assertions.assertNotNull(seaTunnelRowTypeInfo);
+        System.out.println(seaTunnelRowTypeInfo);
         TestCollector testCollector = new TestCollector();
         parquetReadStrategy.read(path, testCollector);
     }
 
     public static class TestCollector implements Collector<SeaTunnelRow> {
 
+        private final List<SeaTunnelRow> rows = new ArrayList<>();
+
+        public List<SeaTunnelRow> getRows() {
+            return rows;
+        }
+
         @SuppressWarnings("checkstyle:RegexpSingleline")
         @Override
         public void collect(SeaTunnelRow record) {
             System.out.println(record);
+            rows.add(record);
         }
 
         @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/hive.parquet
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/hive.parquet
new file mode 100644
index 000000000..19a08e296
Binary files /dev/null and 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/hive.parquet
 differ
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_orc.conf
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_orc.conf
new file mode 100644
index 000000000..2f0bcc922
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_orc.conf
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+{
+  read_columns = [tinyint_col, boolean_col]
+}
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_parquet.conf
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_parquet.conf
new file mode 100644
index 000000000..45d30b161
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_parquet.conf
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+{
+  read_columns = [test_bigint, test_tinyint, test_smallint]
+}
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_parquet2.conf
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_parquet2.conf
new file mode 100644
index 000000000..9472c80ab
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_read_parquet2.conf
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+{
+  read_columns = [test_array, test_map]
+}
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/timestamp_as_int64.parquet
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/timestamp_as_int64.parquet
new file mode 100644
index 000000000..2c111e2bb
Binary files /dev/null and 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/timestamp_as_int64.parquet
 differ
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.parquet
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/timestamp_as_int96.parquet
similarity index 100%
rename from 
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.parquet
rename to 
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/timestamp_as_int96.parquet
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index ed6c6700c..7d7c271e1 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggreg
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
 
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.thrift.TException;
 
 import lombok.extern.slf4j.Slf4j;
@@ -63,6 +64,8 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
                 try {
                     hiveMetaStore.addPartitions(dbName, tableName, partitions);
                     log.info("Add these partitions {}", partitions);
+                } catch (AlreadyExistsException e) {
+                    log.warn("These partitions {} are already exists", 
partitions);
                 } catch (TException e) {
                     log.error("Failed to add these partitions {}", partitions, 
e);
                     errorCommitInfos.add(aggregatedCommitInfo);
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 31a6a8a04..b773bc4bd 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -28,7 +28,6 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink;
@@ -58,12 +57,16 @@ import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConf
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_NAME_EXPRESSION;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_PATH;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.HAVE_PARTITION;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.PARTITION_BY;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.PARTITION_DIR_EXPRESSION;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.ROW_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.SINK_COLUMNS;
+import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.METASTORE_URI;
 import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_OUTPUT_FORMAT_CLASSNAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_OUTPUT_FORMAT_CLASSNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TABLE_NAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_OUTPUT_FORMAT_CLASSNAME;
 
 @AutoService(SeaTunnelSink.class)
@@ -80,8 +83,7 @@ public class HiveSink extends BaseHdfsFileSink {
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig, HiveConfig.METASTORE_URI.key(), 
HiveConfig.TABLE_NAME.key());
+                CheckConfigUtil.checkAllExists(pluginConfig, 
METASTORE_URI.key(), TABLE_NAME.key());
         if (!result.isSuccess()) {
             throw new HiveConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -89,12 +91,34 @@ public class HiveSink extends BaseHdfsFileSink {
                             "PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SINK, 
result.getMsg()));
         }
-        if 
(pluginConfig.hasPath(BaseSinkConfig.PARTITION_DIR_EXPRESSION.key())) {
+        result =
+                CheckConfigUtil.checkAtLeastOneExists(
+                        pluginConfig,
+                        FILE_FORMAT.key(),
+                        FILE_PATH.key(),
+                        FIELD_DELIMITER.key(),
+                        ROW_DELIMITER.key(),
+                        IS_PARTITION_FIELD_WRITE_IN_FILE.key(),
+                        PARTITION_DIR_EXPRESSION.key(),
+                        HAVE_PARTITION.key(),
+                        SINK_COLUMNS.key(),
+                        PARTITION_BY.key());
+        if (result.isSuccess()) {
             throw new HiveConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                     String.format(
-                            "Hive sink connector does not support setting %s",
-                            BaseSinkConfig.PARTITION_DIR_EXPRESSION.key()));
+                            "Hive sink connector does not support these 
setting [%s]",
+                            String.join(
+                                    ",",
+                                    FILE_FORMAT.key(),
+                                    FILE_PATH.key(),
+                                    FIELD_DELIMITER.key(),
+                                    ROW_DELIMITER.key(),
+                                    IS_PARTITION_FIELD_WRITE_IN_FILE.key(),
+                                    PARTITION_DIR_EXPRESSION.key(),
+                                    HAVE_PARTITION.key(),
+                                    SINK_COLUMNS.key(),
+                                    PARTITION_BY.key())));
         }
         Pair<String[], Table> tableInfo = 
HiveConfig.getTableInfo(pluginConfig);
         dbName = tableInfo.getLeft()[0];
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index c250b0a81..b4b5311f7 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -17,16 +17,21 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.source;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.BaseHdfsFileSource;
@@ -35,15 +40,21 @@ import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErr
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 
 import com.google.auto.service.AutoService;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema.SCHEMA;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig.FILE_PATH;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig.FILE_TYPE;
 import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_INPUT_FORMAT_CLASSNAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_INPUT_FORMAT_CLASSNAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_INPUT_FORMAT_CLASSNAME;
@@ -69,6 +80,25 @@ public class HiveSource extends BaseHdfsFileSource {
                             "PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SOURCE, 
result.getMsg()));
         }
+        result =
+                CheckConfigUtil.checkAtLeastOneExists(
+                        pluginConfig,
+                        SCHEMA.key(),
+                        FILE_TYPE.key(),
+                        FILE_PATH.key(),
+                        FS_DEFAULT_NAME_KEY);
+        if (result.isSuccess()) {
+            throw new HiveConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "Hive source connector does not support these 
setting [%s]",
+                            String.join(
+                                    ",",
+                                    SCHEMA.key(),
+                                    FILE_TYPE.key(),
+                                    FILE_PATH.key(),
+                                    FS_DEFAULT_NAME_KEY)));
+        }
         if (pluginConfig.hasPath(BaseSourceConfig.READ_PARTITIONS.key())) {
             // verify partition list
             List<String> partitionsList =
@@ -96,17 +126,26 @@ public class HiveSource extends BaseHdfsFileSource {
         if (TEXT_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
             pluginConfig =
                     pluginConfig.withValue(
-                            BaseSourceConfig.FILE_TYPE.key(),
+                            FILE_TYPE.key(),
                             
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()));
+            // Build schema from hive table information
+            // Because the entrySet in typesafe config couldn't keep key-value 
order
+            // So use jackson to keep key-value order
+            Map<String, Object> schema = parseSchema(tableInformation);
+            ConfigRenderOptions options = ConfigRenderOptions.concise();
+            String render = pluginConfig.root().render(options);
+            ObjectNode jsonNodes = JsonUtils.parseObject(render);
+            jsonNodes.putPOJO(SCHEMA.key(), schema);
+            pluginConfig = ConfigFactory.parseString(jsonNodes.toString());
         } else if (PARQUET_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
             pluginConfig =
                     pluginConfig.withValue(
-                            BaseSourceConfig.FILE_TYPE.key(),
+                            FILE_TYPE.key(),
                             
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
         } else if (ORC_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
             pluginConfig =
                     pluginConfig.withValue(
-                            BaseSourceConfig.FILE_TYPE.key(),
+                            FILE_TYPE.key(),
                             
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
         } else {
             throw new HiveConnectorException(
@@ -136,4 +175,43 @@ public class HiveSource extends BaseHdfsFileSource {
         }
         super.prepare(pluginConfig);
     }
+
+    private Map<String, Object> parseSchema(Table table) {
+        LinkedHashMap<String, Object> fields = new LinkedHashMap<>();
+        LinkedHashMap<String, Object> schema = new LinkedHashMap<>();
+        List<FieldSchema> cols = table.getSd().getCols();
+        for (FieldSchema col : cols) {
+            String name = col.getName();
+            String type = col.getType();
+            fields.put(name, covertHiveTypeToSeaTunnelType(type));
+        }
+        schema.put("fields", fields);
+        return schema;
+    }
+
+    private Object covertHiveTypeToSeaTunnelType(String hiveType) {
+        if (hiveType.contains("varchar")) {
+            return SqlType.STRING;
+        }
+        if (hiveType.contains("char")) {
+            throw new HiveConnectorException(
+                    CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                    "SeaTunnel hive connector does not supported char type in 
text table");
+        }
+        if (hiveType.contains("binary")) {
+            return SqlType.BYTES.name();
+        }
+        if (hiveType.contains("struct")) {
+            LinkedHashMap<String, Object> fields = new LinkedHashMap<>();
+            int start = hiveType.indexOf("<");
+            int end = hiveType.lastIndexOf(">");
+            String[] columns = hiveType.substring(start + 1, end).split(",");
+            for (String column : columns) {
+                String[] splits = column.split(":");
+                fields.put(splits[0], 
covertHiveTypeToSeaTunnelType(splits[1]));
+            }
+            return fields;
+        }
+        return hiveType;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index 1a1f954ae..f53384a41 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -19,26 +19,22 @@ package 
org.apache.seatunnel.connectors.seatunnel.hive.utils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TException;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 
@@ -55,34 +51,8 @@ public class HiveMetaStoreProxy {
                 && 
config.hasPath(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key())) {
             String principal = 
config.getString(BaseSourceConfig.KERBEROS_PRINCIPAL.key());
             String keytabPath = 
config.getString(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key());
-            if (StringUtils.isBlank(principal) || 
StringUtils.isBlank(keytabPath)) {
-                String errorMsg =
-                        String.format(
-                                "Kerberos principal [%s] or keytab file path 
[%s] is blank,"
-                                        + "please check",
-                                principal, keytabPath);
-                throw new HiveConnectorException(
-                        CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg);
-            }
             Configuration configuration = new Configuration();
-            configuration.set("hadoop.security.authentication", "kerberos");
-            UserGroupInformation.setConfiguration(configuration);
-            try {
-                log.info(
-                        "Start Kerberos authentication using principal {} and 
keytab {}",
-                        principal,
-                        keytabPath);
-                UserGroupInformation.loginUserFromKeytab(principal, 
keytabPath);
-                log.info("Kerberos authentication successful");
-            } catch (IOException e) {
-                String errorMsg =
-                        String.format(
-                                "Kerberos authentication failed using this "
-                                        + "principal [%s] and keytab path 
[%s]",
-                                principal, keytabPath);
-                throw new FileConnectorException(
-                        CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, 
e);
-            }
+            FileSystemUtils.doKerberosAuthentication(configuration, principal, 
keytabPath);
         }
         try {
             hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 9cc18be96..8c4fcc490 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -46,6 +46,7 @@ import 
org.apache.seatunnel.format.json.JsonDeserializationSchema;
 import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
 import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 import org.apache.seatunnel.format.text.TextDeserializationSchema;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
 
 import org.apache.kafka.common.TopicPartition;
 
@@ -259,7 +260,7 @@ public class KafkaSource
             this.deserializationSchema =
                     TextDeserializationSchema.builder()
                             .seaTunnelRowType(typeInfo)
-                            .delimiter(String.valueOf('\002'))
+                            .delimiter(TextFormatConstant.PLACEHOLDER)
                             .build();
         }
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index 31aed642e..a8bbdf0d1 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -70,6 +70,10 @@ public class LocalFileIT extends TestSuiteBase {
         Container.ExecResult textReadResult =
                 container.executeJob("/text/local_file_text_to_assert.conf");
         Assertions.assertEquals(0, textReadResult.getExitCode());
+        // test read local text file with projection
+        Container.ExecResult textProjectionResult =
+                
container.executeJob("/text/local_file_text_projection_to_assert.conf");
+        Assertions.assertEquals(0, textProjectionResult.getExitCode());
         // test write local json file
         Container.ExecResult jsonWriteResult =
                 container.executeJob("/json/fake_to_local_file_json.conf");
@@ -86,6 +90,10 @@ public class LocalFileIT extends TestSuiteBase {
         Container.ExecResult orcReadResult =
                 container.executeJob("/orc/local_file_orc_to_assert.conf");
         Assertions.assertEquals(0, orcReadResult.getExitCode());
+        // test read local orc file with projection
+        Container.ExecResult orcProjectionResult =
+                
container.executeJob("/orc/local_file_orc_projection_to_assert.conf");
+        Assertions.assertEquals(0, orcProjectionResult.getExitCode());
         // test write local parquet file
         Container.ExecResult parquetWriteResult =
                 
container.executeJob("/parquet/fake_to_local_file_parquet.conf");
@@ -94,5 +102,9 @@ public class LocalFileIT extends TestSuiteBase {
         Container.ExecResult parquetReadResult =
                 
container.executeJob("/parquet/local_file_parquet_to_assert.conf");
         Assertions.assertEquals(0, parquetReadResult.getExitCode());
+        // test read local parquet file with projection
+        Container.ExecResult parquetProjectionResult =
+                
container.executeJob("/parquet/local_file_parquet_projection_to_assert.conf");
+        Assertions.assertEquals(0, parquetProjectionResult.getExitCode());
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/e2e.orc
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/e2e.orc
index 507ce3703..d50f6bb54 100644
Binary files 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/e2e.orc
 and 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/e2e.orc
 differ
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_projection_to_assert.conf
similarity index 80%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_projection_to_assert.conf
index d99fef31a..9cde028af 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_projection_to_assert.conf
@@ -27,8 +27,9 @@ env {
 
 source {
   LocalFile {
-    path = "/seatunnel/read/parquet"
-    type = "parquet"
+    path = "/seatunnel/read/orc"
+    type = "orc"
+    read_columns = [c_string, c_boolean, c_double]
     result_table_name = "fake"
   }
 }
@@ -69,24 +70,6 @@ sink {
               rule_type = NOT_NULL
             }
           ]
-        },
-        {
-          field_name = name
-          field_type = string
-          field_value = [
-            {
-              rule_type = NOT_NULL
-            }
-          ]
-        },
-        {
-          field_name = hobby
-          field_type = string
-          field_value = [
-            {
-              rule_type = NOT_NULL
-            }
-          ]
         }
       ]
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
index d99fef31a..7549af6fd 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
@@ -27,8 +27,8 @@ env {
 
 source {
   LocalFile {
-    path = "/seatunnel/read/parquet"
-    type = "parquet"
+    path = "/seatunnel/read/orc"
+    type = "orc"
     result_table_name = "fake"
   }
 }
@@ -69,24 +69,6 @@ sink {
               rule_type = NOT_NULL
             }
           ]
-        },
-        {
-          field_name = name
-          field_type = string
-          field_value = [
-            {
-              rule_type = NOT_NULL
-            }
-          ]
-        },
-        {
-          field_name = hobby
-          field_type = string
-          field_value = [
-            {
-              rule_type = NOT_NULL
-            }
-          ]
         }
       ]
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_projection_to_assert.conf
similarity index 83%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_projection_to_assert.conf
index d99fef31a..dac3515a0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_projection_to_assert.conf
@@ -29,6 +29,7 @@ source {
   LocalFile {
     path = "/seatunnel/read/parquet"
     type = "parquet"
+    read_columns = [c_string, c_boolean, c_double]
     result_table_name = "fake"
   }
 }
@@ -69,24 +70,6 @@ sink {
               rule_type = NOT_NULL
             }
           ]
-        },
-        {
-          field_name = name
-          field_type = string
-          field_value = [
-            {
-              rule_type = NOT_NULL
-            }
-          ]
-        },
-        {
-          field_name = hobby
-          field_type = string
-          field_value = [
-            {
-              rule_type = NOT_NULL
-            }
-          ]
         }
       ]
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.txt
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.txt
index 727d77044..9871cd85e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.txt
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.txt
@@ -1,5 +1,5 @@
-{"Eayvq":"YVrWJ","kegtW":"jshQP","ShRel":"abmwR","vDjnr":"gnaVl","zGERi":"nlZlS"}[1208226851,1011688192,220827945,1047769453,1137173284]ajNqtfalse1203130814396025058012143372451870729.532721E371.566752199436781E308eqGaK2022-05-0984150670027331165724.3898288671923612152022-01-10
 
00:52:00{"YZCPo":"saHIr","NXuEq":"lRvfb","rpZUW":"TsaaM","fySFX":"jRTjq","kxmOB":"IYcoV"}[2014870053,823563564,783454480,651161159,704692328]mMPmRtrue221632040893809116054760376851640323.1975
 [...]
-{"njZoW":"GMeYm","UcdNW":"QzeUB","KEUVX":"gBDFK","HYQAp":"wrkAU","CIuxE":"xbFow"}[2031111683,1178991118,2039039138,1613817066,110479538]NJdgitrue-89290489565971423719286406107975683.2706686E385.558510322527257E305TsBfe2022-11-0226360128806482056292.3973861503683968622022-07-19
 
01:55:00{"OYtro":"ujYpV","CmwWw":"scFCA","sLsBv":"OWHCB","pyEBC":"IBJaJ","dcnST":"EWNUE"}[1375044625,1699399583,1568825650,1259259717,1993279080]xGYNztrue-1061370319488882359202931209018536960
 [...]
-{"RHdeD":"IfOaZ","qrTJC":"kyLmx","aangN":"lAeap","ZoLNq":"BUFPs","xgRbl":"wxinc"}[164178112,1126624441,15024852,1149751089,1521818730]YvrXsfalse232515032008388530651078388625039361.6630125E381.7843554887310442E308vpGjb2022-03-2178172562452015409780.5845612134798772192022-08-13
 
07:35:00{"tzgLt":"kDbVh","WQjRn":"qJbFw","CkgpP":"VAefn","ugYhg":"JsunV","IiEeb":"quvGf"}[1776928121,1957734724,524685509,561366753,938508932]mkHAitrue922741247279574969724970679742218241.7999
 [...]
-{"jjfbQ":"YnIXv","yENje":"ffDvT","eMpYW":"LZvRw","oheFK":"QjTUE","OULct":"xrYhB"}[104016153,1372901715,1383968877,1615642149,195294861]vMstWfalse-7625594116342148311317935807474656001.792487E381.0428995536163822E308JSzTD2022-05-2455390102538242453238.6893233870974102472022-04-02
 
03:07:00{"YISEv":"PRNAA","iOZbP":"RuJAC","xUpOy":"KQrHA","KDQtr":"UZwht","KRfJK":"KZpCe"}[1746626910,1211890529,1600627099,866858633,1489006903]wfDHTfalse-126224418100044198439134512658444288
 [...]
-{"Bdbdl":"oODJP","KqLlu":"vyGtf","jvGPe":"yRctE","rRyXw":"BSABQ","rHvsQ":"VCUqr"}[333070022,1078936427,1553422323,979329793,1601100765]Zbmuofalse1081556163411943031919830971434823681.1692632E381.0154981418689449E308IuDpI2022-06-1518027895538483530319.4903776087281118622022-06-23
 
00:40:00{"cqeow":"wXwCu","nFjrN":"zWila","lTedA":"WrGgl","NPLcF":"RXUhL","EIzgS":"MFnLE"}[814534518,1778618727,2107481454,1240038762,974703010]HtbvRtrue12423258182702762422924058106246453761
 [...]
\ No newline at end of file
+uDDrwsQQYONTNeUBIOnLAgunvDqLBObroRzdEdvDgRmgaeFyFH5456857591576298739157764687713794636442057612252MTDnafalse3313846190943192276641872220071936002.4798444E389.52375328387482E307vcIGF2023-06-0776258155390368615610.7646252373186602912023-05-08
 
16:08:51ipToEdierOAbwQfQzObWqiRhjkWYaMKdCbjurhstsWrAVlRyyR2905930362869031292782506910815576701385108050hArFutrue12631169122166306155952414159791708165.949173E372.1775762383875058E307kMlgO2023-05-20
 [...]
+QIpzzZNFkLwARZDSdwdBzkegCdIRVYJnuXgxNXytAJxxaTzmDF16603816781145850255103997497062535321459349811xaTOktrue5327578191749099325840234439082792961.955231E381.5072154481920294E308GDWOu2023-05-0581449039533149712064.4515003874168475032023-07-06
 
22:34:11sfgxhqvOLzjdTSNcNaWfEnZqvQraSSuMPazCGhPmSrGuxggqGh111449466287130860562118177510004750271267350957FDhTstrue96247293946402921952995131535667203.3240283E384.473485404447698E307YFdwf2023-02-04294
 [...]
+xVJPgVlosBlTYSkmJCqKHMXzbZkNQKInuVMZeYGhsmzUmcLyPx137745493211075991209783701051546835517166168384qcYaifalse8318050110096656524405690917018449922.9617934E371.8901064340036343E307jaKMq2023-05-1275317114043170470995.9654034735914367862023-05-18
 
08:09:22raGGBnHsNwMZKemkFErUbedNjSllNcKOVUGdTpXcHGSVphHsNE86377304018502081846122308810391870441519757437JCRZStrue1829974183977114228752256792969205767.9090967E371.6286963710372255E308NBHUB2023-05-0
 [...]
+dBgFeTKkCfnxCljyGfNEurEzCVgwpsHgmcOfYXiQHxeeQNjQuq1961913761867016982512369059615238191571813320BTfhbfalse652666522281866957533025299230722.1456136E381.2398422714159417E308YOiwg2023-10-2433001899362876139955.7235198795513055732023-06-23
 
13:46:46jsvmHLHlXCGFKwuqlTwAjdMckElrmqgBWvOuuKuWxcinFZWSky19959088245502706421265289671411088181469730839vUyULtrue952655754382886132164227350822215681.9033253E381.0966562906060974E308XFeKf2023-09-1731084
 [...]
+obtYzIHOTKsABVtirEKEMYUYobsYlDJcFbpQUYvGxCcKlnswEG8096984004544201585383739017658796661353001394xchcntrue853141253976762312923177914159380482.8480754E381.055208146200822E308MSkTD2023-11-2420361788179232141281.9718823433892185262023-10-25
 
11:47:50gdCWZMGESyarjQPopBhDwKnOyDvaUDgQOEDRCmfUAagfnDDPqV8473436731118772451890654127233667151574025969ewJzLtrue6321769209768782446484076920790579202.7134378E381.1883616449174808E308STvOu2023-10-082179
 [...]
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_projection_to_assert.conf
similarity index 68%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_projection_to_assert.conf
index d99fef31a..9a70cf768 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_projection_to_assert.conf
@@ -27,8 +27,43 @@ env {
 
 source {
   LocalFile {
-    path = "/seatunnel/read/parquet"
-    type = "parquet"
+    path = "/seatunnel/read/text"
+    type = "text"
+    read_columns = [c_string, c_boolean, c_double]
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
     result_table_name = "fake"
   }
 }
diff --git 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
index e2476ffe3..86f25a69b 100644
--- 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.format.text;
 
-import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
-
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -26,44 +24,100 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
-import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
 import org.apache.seatunnel.format.text.exception.SeaTunnelTextFormatException;
 
 import org.apache.commons.lang3.StringUtils;
 
-import lombok.Builder;
 import lombok.NonNull;
 
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-@Builder
 public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunnelRow> {
-    @NonNull private SeaTunnelRowType seaTunnelRowType;
-    @NonNull private String delimiter;
-    @Builder.Default private DateUtils.Formatter dateFormatter = 
DateUtils.Formatter.YYYY_MM_DD;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final String[] separators;
+    private final DateUtils.Formatter dateFormatter;
+    private final DateTimeUtils.Formatter dateTimeFormatter;
+    private final TimeUtils.Formatter timeFormatter;
+
+    private TextDeserializationSchema(
+            @NonNull SeaTunnelRowType seaTunnelRowType,
+            String[] separators,
+            DateUtils.Formatter dateFormatter,
+            DateTimeUtils.Formatter dateTimeFormatter,
+            TimeUtils.Formatter timeFormatter) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.separators = separators;
+        this.dateFormatter = dateFormatter;
+        this.dateTimeFormatter = dateTimeFormatter;
+        this.timeFormatter = timeFormatter;
+    }
 
-    @Builder.Default
-    private DateTimeUtils.Formatter dateTimeFormatter = 
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+    public static Builder builder() {
+        return new Builder();
+    }
 
-    @Builder.Default private TimeUtils.Formatter timeFormatter = 
TimeUtils.Formatter.HH_MM_SS;
+    public static class Builder {
+        private SeaTunnelRowType seaTunnelRowType;
+        private String[] separators = TextFormatConstant.SEPARATOR.clone();
+        private DateUtils.Formatter dateFormatter = 
DateUtils.Formatter.YYYY_MM_DD;
+        private DateTimeUtils.Formatter dateTimeFormatter =
+                DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+        private TimeUtils.Formatter timeFormatter = 
TimeUtils.Formatter.HH_MM_SS;
+
+        private Builder() {}
+
+        public Builder seaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
+            this.seaTunnelRowType = seaTunnelRowType;
+            return this;
+        }
+
+        public Builder delimiter(String delimiter) {
+            this.separators[0] = delimiter;
+            return this;
+        }
+
+        public Builder separators(String[] separators) {
+            this.separators = separators;
+            return this;
+        }
+
+        public Builder dateFormatter(DateUtils.Formatter dateFormatter) {
+            this.dateFormatter = dateFormatter;
+            return this;
+        }
+
+        public Builder dateTimeFormatter(DateTimeUtils.Formatter 
dateTimeFormatter) {
+            this.dateTimeFormatter = dateTimeFormatter;
+            return this;
+        }
+
+        public Builder timeFormatter(TimeUtils.Formatter timeFormatter) {
+            this.timeFormatter = timeFormatter;
+            return this;
+        }
+
+        public TextDeserializationSchema build() {
+            return new TextDeserializationSchema(
+                    seaTunnelRowType, separators, dateFormatter, 
dateTimeFormatter, timeFormatter);
+        }
+    }
 
     @Override
     public SeaTunnelRow deserialize(byte[] message) throws IOException {
         String content = new String(message);
-        Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(content, 
seaTunnelRowType);
+        Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(content, 
seaTunnelRowType, 0);
         Object[] objects = new Object[seaTunnelRowType.getTotalFields()];
         for (int i = 0; i < objects.length; i++) {
-            objects[i] = convert(splitsMap.get(i), 
seaTunnelRowType.getFieldType(i));
+            objects[i] = convert(splitsMap.get(i), 
seaTunnelRowType.getFieldType(i), 0);
         }
         return new SeaTunnelRow(objects);
     }
@@ -74,50 +128,34 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
     }
 
     private Map<Integer, String> splitLineBySeaTunnelRowType(
-            String line, SeaTunnelRowType seaTunnelRowType) {
-        String[] splits = line.split(delimiter, -1);
+            String line, SeaTunnelRowType seaTunnelRowType, int level) {
+        String[] splits = line.split(separators[level], -1);
         LinkedHashMap<Integer, String> splitsMap = new LinkedHashMap<>();
         SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
-        int cursor = 0;
-        for (int i = 0; i < fieldTypes.length; i++) {
-            if (fieldTypes[i].getSqlType() == SqlType.ROW) {
-                // row type
-                int totalFields = ((SeaTunnelRowType) 
fieldTypes[i]).getTotalFields();
-                // if current field is empty
-                if (cursor >= splits.length) {
-                    splitsMap.put(i, null);
-                } else {
-                    ArrayList<String> rowSplits =
-                            new ArrayList<>(
-                                    Arrays.asList(splits).subList(cursor, 
cursor + totalFields));
-                    splitsMap.put(i, String.join(delimiter, rowSplits));
-                }
-                cursor += totalFields;
-            } else {
-                // not row type
-                // if current field is empty
-                if (cursor >= splits.length) {
-                    splitsMap.put(i, null);
-                    cursor++;
-                } else {
-                    splitsMap.put(i, splits[cursor++]);
-                }
+        for (int i = 0; i < splits.length; i++) {
+            splitsMap.put(i, splits[i]);
+        }
+        if (fieldTypes.length > splits.length) {
+            // contains partition columns
+            for (int i = splits.length; i < fieldTypes.length; i++) {
+                splitsMap.put(i, null);
             }
         }
         return splitsMap;
     }
 
-    private Object convert(String field, SeaTunnelDataType<?> fieldType) {
+    private Object convert(String field, SeaTunnelDataType<?> fieldType, int 
level) {
         if (StringUtils.isBlank(field)) {
             return null;
         }
         switch (fieldType.getSqlType()) {
             case ARRAY:
                 BasicType<?> elementType = ((ArrayType<?, ?>) 
fieldType).getElementType();
-                ArrayNode jsonNodes = JsonUtils.parseArray(field);
+                String[] elements = field.split(separators[level + 1]);
                 ArrayList<Object> objectArrayList = new ArrayList<>();
-                jsonNodes.forEach(
-                        jsonNode -> 
objectArrayList.add(convert(jsonNode.toString(), elementType)));
+                for (String element : elements) {
+                    objectArrayList.add(convert(element, elementType, level + 
1));
+                }
                 switch (elementType.getSqlType()) {
                     case STRING:
                         return objectArrayList.toArray(new String[0]);
@@ -146,10 +184,13 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
                 SeaTunnelDataType<?> keyType = ((MapType<?, ?>) 
fieldType).getKeyType();
                 SeaTunnelDataType<?> valueType = ((MapType<?, ?>) 
fieldType).getValueType();
                 LinkedHashMap<Object, Object> objectMap = new 
LinkedHashMap<>();
-                Map<String, String> fieldsMap = JsonUtils.toMap(field);
-                fieldsMap.forEach(
-                        (key, value) ->
-                                objectMap.put(convert(key, keyType), 
convert(value, valueType)));
+                String[] kvs = field.split(separators[level + 1]);
+                for (String kv : kvs) {
+                    String[] splits = kv.split(separators[level + 2]);
+                    objectMap.put(
+                            convert(splits[0], keyType, level + 1),
+                            convert(splits[1], valueType, level + 1));
+                }
                 return objectMap;
             case STRING:
                 return field;
@@ -181,13 +222,14 @@ public class TextDeserializationSchema implements 
DeserializationSchema<SeaTunne
                 return DateTimeUtils.parse(field, dateTimeFormatter);
             case ROW:
                 Map<Integer, String> splitsMap =
-                        splitLineBySeaTunnelRowType(field, (SeaTunnelRowType) 
fieldType);
+                        splitLineBySeaTunnelRowType(field, (SeaTunnelRowType) 
fieldType, level + 1);
                 Object[] objects = new Object[splitsMap.size()];
                 for (int i = 0; i < objects.length; i++) {
                     objects[i] =
                             convert(
                                     splitsMap.get(i),
-                                    ((SeaTunnelRowType) 
fieldType).getFieldType(i));
+                                    ((SeaTunnelRowType) 
fieldType).getFieldType(i),
+                                    level + 1);
                 }
                 return new SeaTunnelRow(objects);
             default:
diff --git 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
index 1c7293adf..59a65e1e2 100644
--- 
a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextSerializationSchema.java
@@ -18,14 +18,17 @@
 package org.apache.seatunnel.format.text;
 
 import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
-import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.format.text.constant.TextFormatConstant;
 import org.apache.seatunnel.format.text.exception.SeaTunnelTextFormatException;
 
 import lombok.Builder;
@@ -34,17 +37,79 @@ import lombok.NonNull;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
 
-@Builder
 public class TextSerializationSchema implements SerializationSchema {
-    @NonNull private SeaTunnelRowType seaTunnelRowType;
-    @NonNull private String delimiter;
-    @Builder.Default private DateUtils.Formatter dateFormatter = 
DateUtils.Formatter.YYYY_MM_DD;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final String[] separators;
+    private final DateUtils.Formatter dateFormatter;
+    private final DateTimeUtils.Formatter dateTimeFormatter;
+    private final TimeUtils.Formatter timeFormatter;
 
-    @Builder.Default
-    private DateTimeUtils.Formatter dateTimeFormatter = 
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+    private TextSerializationSchema(
+            @NonNull SeaTunnelRowType seaTunnelRowType,
+            String[] separators,
+            DateUtils.Formatter dateFormatter,
+            DateTimeUtils.Formatter dateTimeFormatter,
+            TimeUtils.Formatter timeFormatter) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.separators = separators;
+        this.dateFormatter = dateFormatter;
+        this.dateTimeFormatter = dateTimeFormatter;
+        this.timeFormatter = timeFormatter;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private SeaTunnelRowType seaTunnelRowType;
+        private String[] separators = TextFormatConstant.SEPARATOR.clone();
+        private DateUtils.Formatter dateFormatter = 
DateUtils.Formatter.YYYY_MM_DD;
+        private DateTimeUtils.Formatter dateTimeFormatter =
+                DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+        private TimeUtils.Formatter timeFormatter = 
TimeUtils.Formatter.HH_MM_SS;
+
+        private Builder() {}
 
-    @Builder.Default private TimeUtils.Formatter timeFormatter = 
TimeUtils.Formatter.HH_MM_SS;
+        public Builder seaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
+            this.seaTunnelRowType = seaTunnelRowType;
+            return this;
+        }
+
+        public Builder delimiter(String delimiter) {
+            this.separators[0] = delimiter;
+            return this;
+        }
+
+        public Builder separators(String[] separators) {
+            this.separators = separators;
+            return this;
+        }
+
+        public Builder dateFormatter(DateUtils.Formatter dateFormatter) {
+            this.dateFormatter = dateFormatter;
+            return this;
+        }
+
+        public Builder dateTimeFormatter(DateTimeUtils.Formatter 
dateTimeFormatter) {
+            this.dateTimeFormatter = dateTimeFormatter;
+            return this;
+        }
+
+        public Builder timeFormatter(TimeUtils.Formatter timeFormatter) {
+            this.timeFormatter = timeFormatter;
+            return this;
+        }
+
+        public TextSerializationSchema build() {
+            return new TextSerializationSchema(
+                    seaTunnelRowType, separators, dateFormatter, 
dateTimeFormatter, timeFormatter);
+        }
+    }
 
     @Override
     public byte[] serialize(SeaTunnelRow element) {
@@ -55,12 +120,12 @@ public class TextSerializationSchema implements 
SerializationSchema {
         Object[] fields = element.getFields();
         String[] strings = new String[fields.length];
         for (int i = 0; i < fields.length; i++) {
-            strings[i] = convert(fields[i], seaTunnelRowType.getFieldType(i));
+            strings[i] = convert(fields[i], seaTunnelRowType.getFieldType(i), 
0);
         }
-        return String.join(delimiter, strings).getBytes();
+        return String.join(separators[0], strings).getBytes();
     }
 
-    private String convert(Object field, SeaTunnelDataType<?> fieldType) {
+    private String convert(Object field, SeaTunnelDataType<?> fieldType, int 
level) {
         if (field == null) {
             return "";
         }
@@ -86,15 +151,36 @@ public class TextSerializationSchema implements 
SerializationSchema {
             case BYTES:
                 return new String((byte[]) field);
             case ARRAY:
+                BasicType<?> elementType = ((ArrayType<?, ?>) 
fieldType).getElementType();
+                return Arrays.stream((Object[]) field)
+                        .map(f -> convert(f, elementType, level + 1))
+                        .collect(Collectors.joining(separators[level + 1]));
             case MAP:
-                return JsonUtils.toJsonString(field);
+                SeaTunnelDataType<?> keyType = ((MapType<?, ?>) 
fieldType).getKeyType();
+                SeaTunnelDataType<?> valueType = ((MapType<?, ?>) 
fieldType).getValueType();
+                return ((Map<Object, Object>) field)
+                        .entrySet().stream()
+                                .map(
+                                        entry ->
+                                                String.join(
+                                                        separators[level + 2],
+                                                        
convert(entry.getKey(), keyType, level + 1),
+                                                        convert(
+                                                                
entry.getValue(),
+                                                                valueType,
+                                                                level + 1)))
+                                .collect(Collectors.joining(separators[level + 
1]));
             case ROW:
                 Object[] fields = ((SeaTunnelRow) field).getFields();
                 String[] strings = new String[fields.length];
                 for (int i = 0; i < fields.length; i++) {
-                    strings[i] = convert(fields[i], ((SeaTunnelRowType) 
fieldType).getFieldType(i));
+                    strings[i] =
+                            convert(
+                                    fields[i],
+                                    ((SeaTunnelRowType) 
fieldType).getFieldType(i),
+                                    level + 1);
                 }
-                return String.join(delimiter, strings);
+                return String.join(separators[level + 1], strings);
             default:
                 throw new SeaTunnelTextFormatException(
                         CommonErrorCode.UNSUPPORTED_DATA_TYPE,
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/constant/TextFormatConstant.java
similarity index 51%
copy from 
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
copy to 
seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/constant/TextFormatConstant.java
index e966ebcd5..76e115d3e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
+++ 
b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/constant/TextFormatConstant.java
@@ -15,30 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.exception;
+package org.apache.seatunnel.format.text.constant;
 
-import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+public class TextFormatConstant {
 
-public enum FileConnectorErrorCode implements SeaTunnelErrorCode {
-    FILE_TYPE_INVALID("FILE-01", "File type is invalid"),
-    DATA_DESERIALIZE_FAILED("FILE-02", "Data deserialization failed"),
-    FILE_LIST_GET_FAILED("FILE-03", "Get file list failed");
+    public static final String[] SEPARATOR =
+            new String[] {"\u0001", "\u0002", "\u0003", "\u0004", "\u0005", 
"\u0006", "\u0007"};
 
-    private final String code;
-    private final String description;
+    public static final String PLACEHOLDER = "\u0008";
 
-    FileConnectorErrorCode(String code, String description) {
-        this.code = code;
-        this.description = description;
-    }
-
-    @Override
-    public String getCode() {
-        return code;
-    }
-
-    @Override
-    public String getDescription() {
-        return description;
-    }
+    private TextFormatConstant() {}
 }
diff --git 
a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
index 1d272797c..7d904e2c8 100644
--- 
a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java
@@ -32,26 +32,37 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Map;
 
 public class TextFormatSchemaTest {
     public String content =
-            "[1,2,3,4,5,6]#"
-                    + "{\"tyrantlucifer\":18,\"Kris\":21}#"
-                    + "tyrantlucifer#"
-                    + "true#"
-                    + "1#"
-                    + "2#"
-                    + "3#"
-                    + "4#"
-                    + "6.66#"
-                    + "7.77#"
-                    + "8.8888888#"
-                    + "#"
-                    + "tyrantlucifer#"
-                    + "2022-09-24#"
-                    + "22:45:00#"
-                    + "2022-09-24 22:45:00";
+            String.join("\u0002", Arrays.asList("1", "2", "3", "4", "5", "6"))
+                    + '\001'
+                    + "tyrantlucifer"
+                    + '\003'
+                    + "18"
+                    + '\002'
+                    + "Kris"
+                    + '\003'
+                    + "21\001"
+                    + "tyrantlucifer\001"
+                    + "true\001"
+                    + "1\001"
+                    + "2\001"
+                    + "3\001"
+                    + "4\001"
+                    + "6.66\001"
+                    + "7.77\001"
+                    + "8.8888888\001"
+                    + '\001'
+                    + "tyrantlucifer\001"
+                    + "2022-09-24\001"
+                    + "22:45:00\001"
+                    + "2022-09-24 22:45:00\001"
+                    + String.join("\u0003", Arrays.asList("1", "2", "3", "4", 
"5", "6"))
+                    + '\002'
+                    + "tyrantlucifer\00418\003Kris\00421";
 
     public SeaTunnelRowType seaTunnelRowType;
 
@@ -76,7 +87,8 @@ public class TextFormatSchemaTest {
                             "bytes_field",
                             "date_field",
                             "time_field",
-                            "timestamp_field"
+                            "timestamp_field",
+                            "row_field"
                         },
                         new SeaTunnelDataType<?>[] {
                             ArrayType.INT_ARRAY_TYPE,
@@ -94,7 +106,15 @@ public class TextFormatSchemaTest {
                             PrimitiveByteArrayType.INSTANCE,
                             LocalTimeType.LOCAL_DATE_TYPE,
                             LocalTimeType.LOCAL_TIME_TYPE,
-                            LocalTimeType.LOCAL_DATE_TIME_TYPE
+                            LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                            new SeaTunnelRowType(
+                                    new String[] {
+                                        "array_field", "map_field",
+                                    },
+                                    new SeaTunnelDataType<?>[] {
+                                        ArrayType.INT_ARRAY_TYPE,
+                                        new MapType<>(BasicType.STRING_TYPE, 
BasicType.INT_TYPE),
+                                    })
                         });
     }
 
@@ -103,12 +123,12 @@ public class TextFormatSchemaTest {
         TextDeserializationSchema deserializationSchema =
                 TextDeserializationSchema.builder()
                         .seaTunnelRowType(seaTunnelRowType)
-                        .delimiter("#")
+                        .delimiter("\u0001")
                         .build();
         TextSerializationSchema serializationSchema =
                 TextSerializationSchema.builder()
                         .seaTunnelRowType(seaTunnelRowType)
-                        .delimiter("#")
+                        .delimiter("\u0001")
                         .build();
         SeaTunnelRow seaTunnelRow = 
deserializationSchema.deserialize(content.getBytes());
         String data = new String(serializationSchema.serialize(seaTunnelRow));

Reply via email to