This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 74db1cbaac [Feature][File] Support extract CSV files with different
columns in different order (#9064)
74db1cbaac is described below
commit 74db1cbaac8e1b913c7959d5bdc63a087aad967a
Author: litiliu <[email protected]>
AuthorDate: Fri Mar 28 22:09:11 2025 +0800
[Feature][File] Support extract CSV files with different columns in
different order (#9064)
---
docs/en/connector-v2/source/CosFile.md | 5 ++
docs/en/connector-v2/source/FtpFile.md | 5 ++
docs/en/connector-v2/source/HdfsFile.md | 1 +
docs/en/connector-v2/source/LocalFile.md | 5 ++
docs/en/connector-v2/source/OssFile.md | 2 +
docs/en/connector-v2/source/OssJindoFile.md | 1 +
docs/en/connector-v2/source/S3File.md | 2 +
docs/en/connector-v2/source/SftpFile.md | 1 +
docs/zh/connector-v2/source/CosFile.md | 54 +++++++-------
docs/zh/connector-v2/source/FtpFile.md | 58 ++++++++-------
docs/zh/connector-v2/source/HdfsFile.md | 2 +
.../file/config/BaseSourceConfigOptions.java | 7 ++
.../file/source/reader/CsvReadStrategy.java | 42 ++++++++++-
.../e2e/connector/file/local/LocalFileIT.java | 9 +++
.../src/test/resources/csv/csv_with_header1.csv | 2 +
.../src/test/resources/csv/csv_with_header2.csv | 2 +
.../resources/csv/csv_with_header_to_assert.conf | 82 ++++++++++++++++++++++
17 files changed, 228 insertions(+), 52 deletions(-)
diff --git a/docs/en/connector-v2/source/CosFile.md
b/docs/en/connector-v2/source/CosFile.md
index 6f1129adc4..92222e221f 100644
--- a/docs/en/connector-v2/source/CosFile.md
+++ b/docs/en/connector-v2/source/CosFile.md
@@ -66,6 +66,7 @@ To use this connector you need put
hadoop-cos-{hadoop.version}-{version}.jar and
| sheet_name | string | no | - |
| xml_row_tag | string | no | - |
| xml_use_attr_format | boolean | no | - |
+| csv_use_header_line | boolean | no | false |
| file_filter_pattern | string | no | - |
| filename_extension | string | no | - |
| compress_codec | string | no | none |
@@ -274,6 +275,10 @@ Only need to be configured when file_format is xml.
Specifies Whether to process data using the tag attribute format.
+### csv_use_header_line [boolean]
+
+Whether to use the header line to parse the file, only used when the
file_format is `csv` and the file contains the header line that match RFC 4180
+
### file_filter_pattern [string]
Filter pattern, which used for filtering files.
diff --git a/docs/en/connector-v2/source/FtpFile.md
b/docs/en/connector-v2/source/FtpFile.md
index de87cc9fda..f13b86fca6 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -60,6 +60,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
| sheet_name | string | no | - |
| xml_row_tag | string | no | - |
| xml_use_attr_format | boolean | no | - |
+| csv_use_header_line | boolean | no | - |
| file_filter_pattern | string | no | - |
| filename_extension | string | no | - |
| compress_codec | string | no | none |
@@ -317,6 +318,10 @@ Only need to be configured when file_format is xml.
Specifies Whether to process data using the tag attribute format.
+### csv_use_header_line [boolean]
+
+Whether to use the header line to parse the file, only used when the
file_format is `csv` and the file contains the header line that match RFC 4180
+
### compress_codec [string]
The compress codec of files and the details that supported as the following
shown:
diff --git a/docs/en/connector-v2/source/HdfsFile.md
b/docs/en/connector-v2/source/HdfsFile.md
index 903398d829..c7cd5b8073 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -64,6 +64,7 @@ Read data from hdfs file system.
| sheet_name | string | no | - |
Reader the sheet of the workbook,Only used when file_format is excel.
|
| xml_row_tag | string | no | - |
Specifies the tag name of the data rows within the XML file, only used when
file_format is xml.
|
| xml_use_attr_format | boolean | no | - |
Specifies whether to process data using the tag attribute format, only used
when file_format is xml.
|
+| csv_use_header_line | boolean | no | false |
Whether to use the header line to parse the file, only used when the
file_format is `csv` and the file contains the header line that match RFC 4180
|
| file_filter_pattern | string | no | |
Filter pattern, which used for filtering files.
|
| filename_extension | string | no | - |
Filter filename extension, which used for filtering files with specific
extension. Example: `csv` `.txt` `json` `.xml`.
|
| compress_codec | string | no | none | The
compress codec of files
|
diff --git a/docs/en/connector-v2/source/LocalFile.md
b/docs/en/connector-v2/source/LocalFile.md
index c88dda0c9b..408cfbfff3 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -61,6 +61,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
| excel_engine | string | no | POI
|
| xml_row_tag | string | no | -
|
| xml_use_attr_format | boolean | no | -
|
+| csv_use_header_line | boolean | no | false
|
| file_filter_pattern | string | no | -
|
| filename_extension | string | no | -
|
| compress_codec | string | no | none
|
@@ -265,6 +266,10 @@ Only need to be configured when file_format is xml.
Specifies Whether to process data using the tag attribute format.
+### csv_use_header_line [boolean]
+
+Whether to use the header line to parse the file, only used when the
file_format is `csv` and the file contains the header line that match RFC 4180
+
### file_filter_pattern [string]
Filter pattern, which used for filtering files.
diff --git a/docs/en/connector-v2/source/OssFile.md
b/docs/en/connector-v2/source/OssFile.md
index f6f061c1d6..bf19076c8c 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -194,10 +194,12 @@ If you assign file type to `parquet` `orc`, schema option
not required, connecto
| time_format | string | no | HH:mm:ss | Time
type format, used to tell connector how to convert string to time, supported as
the following formats:`HH:mm:ss` `HH:mm:ss.SSS`
|
| filename_extension | string | no | - |
Filter filename extension, which used for filtering files with specific
extension. Example: `csv` `.txt` `json` `.xml`.
|
| skip_header_row_number | long | no | 0 | Skip
the first few lines, but only for the txt and csv. For example, set like
following:`skip_header_row_number = 2`. Then SeaTunnel will skip the first 2
lines from source files
|
+| csv_use_header_line | boolean | no | false |
Whether to use the header line to parse the file, only used when the
file_format is `csv` and the file contains the header line that match RFC 4180
|
| schema | config | no | - | The
schema of upstream data.
|
| sheet_name | string | no | - |
Reader the sheet of the workbook,Only used when file_format is excel.
|
| xml_row_tag | string | no | - |
Specifies the tag name of the data rows within the XML file, only used when
file_format is xml.
|
| xml_use_attr_format | boolean | no | - |
Specifies whether to process data using the tag attribute format, only used
when file_format is xml.
|
+| csv_use_header_line | boolean | no | false |
Whether to use the header line to parse the file, only used when the
file_format is `csv` and the file contains the header line that match RFC 4180
|
| compress_codec | string | no | none | Which
compress codec the files used.
|
| encoding | string | no | UTF-8 |
| null_format | string | no | - | Only
used when file_format_type is text. null_format to define which strings can be
represented as null. e.g: `\N`
|
diff --git a/docs/en/connector-v2/source/OssJindoFile.md
b/docs/en/connector-v2/source/OssJindoFile.md
index b2d891d026..d173bfd7df 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -70,6 +70,7 @@ It only supports hadoop version **2.9.X+**.
| sheet_name | string | no | - |
| xml_row_tag | string | no | - |
| xml_use_attr_format | boolean | no | - |
+| csv_use_header_line | boolean | no | false |
| file_filter_pattern | string | no | |
| compress_codec | string | no | none |
| archive_compress_codec | string | no | none |
diff --git a/docs/en/connector-v2/source/S3File.md
b/docs/en/connector-v2/source/S3File.md
index a5e5270307..ae82ca1133 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -201,10 +201,12 @@ If you assign file type to `parquet` `orc`, schema option
not required, connecto
| datetime_format | string | no | yyyy-MM-dd HH:mm:ss
| Datetime type format, used to tell connector
how to convert string to datetime, supported as the following
formats:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss`
`yyyyMMddHHmmss`
[...]
| time_format | string | no | HH:mm:ss
| Time type format, used to tell connector how
to convert string to time, supported as the following formats:`HH:mm:ss`
`HH:mm:ss.SSS`
[...]
| skip_header_row_number | long | no | 0
| Skip the first few lines, but only for the
txt and csv. For example, set like following:`skip_header_row_number = 2`. Then
SeaTunnel will skip the first 2 lines from source files
[...]
+| csv_use_header_line | boolean | no | false
| Whether to use the header line to parse the
file, only used when the file_format is `csv` and the file contains the header
line that match RFC 4180
|
| schema | config | no | -
| The schema of upstream data.
[...]
| sheet_name | string | no | -
| Reader the sheet of the workbook,Only used
when file_format is excel.
[...]
| xml_row_tag | string | no | -
| Specifies the tag name of the data rows
within the XML file, only valid for XML files.
[...]
| xml_use_attr_format | boolean | no | -
| Specifies whether to process data using the
tag attribute format, only valid for XML files.
[...]
+| csv_use_header_line | boolean | no | false
| Whether to use the header line to parse the
file, only used when the file_format is `csv` and the file contains the header
line that match RFC 4180
|
| compress_codec | string | no | none
|
[...]
| archive_compress_codec | string | no | none
|
[...]
| encoding | string | no | UTF-8
|
[...]
diff --git a/docs/en/connector-v2/source/SftpFile.md
b/docs/en/connector-v2/source/SftpFile.md
index c270a2516a..b6745cc681 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -93,6 +93,7 @@ The File does not have a specific type list, and we can
indicate which SeaTunnel
| sheet_name | String | No | - |
Reader the sheet of the workbook,Only used when file_format is excel.
|
| xml_row_tag | string | no | - |
Specifies the tag name of the data rows within the XML file, only used when
file_format is xml.
|
| xml_use_attr_format | boolean | no | - |
Specifies whether to process data using the tag attribute format, only used
when file_format is xml.
|
+| csv_use_header_line | boolean | no | false |
Whether to use the header line to parse the file, only used when the
file_format is `csv` and the file contains the header line that match RFC 4180
|
| schema | Config | No | - |
Please check #schema below
|
| compress_codec | String | No | None | The
compress codec of files and the details that supported as the following shown:
<br/> - txt: `lzo` `None` <br/> - json: `lzo` `None` <br/> - csv: `lzo` `None`
<br/> - orc: `lzo` `snappy` `lz4` `zlib` `None` <br/> - parquet: `lzo` `snappy`
`lz4` `gzip` `brotli` `zstd` `None` <br/> Tips: excel type does Not support any
compression format |
| archive_compress_codec | string | no | none |
diff --git a/docs/zh/connector-v2/source/CosFile.md
b/docs/zh/connector-v2/source/CosFile.md
index 584ce02f45..b160eafecd 100644
--- a/docs/zh/connector-v2/source/CosFile.md
+++ b/docs/zh/connector-v2/source/CosFile.md
@@ -47,30 +47,31 @@ import ChangeLog from '../changelog/connector-file-cos.md';
## 选项
-|名称 | 类型 | 必需 | 默认值 |
-|---------------------------|---------|---------|---------------------|
-| path | string | 是 | - |
-| file_format_type | string | 是 | - |
-| bucket | string | 是 | - |
-| secret_id | string | 是 | - |
-| secret_key | string | 是 | - |
-| region | string | 是 | - |
-| read_columns | list | 是 | - |
-| delimiter/field_delimiter | string | 否 | \001 |
-| parse_partition_from_path | boolean | 否 | true |
-| skip_header_row_number | long | 否 | 0 |
-| date_format | string | 否 | yyyy-MM-dd |
-| datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss |
-| time_format | string | 否 | HH:mm:ss |
-| schema | config | 否 | - |
-| sheet_name | string | 否 | - |
-| xml_row_tag | string | 否 | - |
-| xml_use_attr_format | boolean | 否 | - |
-| file_filter_pattern | string | 否 | |
-| compress_codec | string | 否 | none |
-| archive_compress_codec | string | 否 | none |
-| encoding | string | 否 | UTF-8 |
-| common-options | | 否 | - |
+| 名称 | 类型 | 必需 | 默认值 |
+|---------------------------------------|---------|-----|---------------------|
+| path | string | 是 | - |
+| file_format_type | string | 是 | - |
+| bucket | string | 是 | - |
+| secret_id | string | 是 | - |
+| secret_key | string | 是 | - |
+| region | string | 是 | - |
+| read_columns | list | 是 | - |
+| delimiter/field_delimiter | string | 否 | \001 |
+| parse_partition_from_path | boolean | 否 | true |
+| skip_header_row_number | long | 否 | 0 |
+| date_format | string | 否 | yyyy-MM-dd |
+| datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss |
+| time_format | string | 否 | HH:mm:ss |
+| schema | config | 否 | - |
+| sheet_name | string | 否 | - |
+| xml_row_tag | string | 否 | - |
+| xml_use_attr_format | boolean | 否 | - |
+| csv_use_header_line | boolean | 否 | false |
+| file_filter_pattern | string | 否 | |
+| compress_codec | string | 否 | none |
+| archive_compress_codec | string | 否 | none |
+| encoding | string | 否 | UTF-8 |
+| common-options | | 否 | - |
### path [string]
@@ -271,6 +272,11 @@ default `HH:mm:ss`
仅当file_format为xml时才需要配置。
指定是否使用标记属性格式处理数据。
+### csv_use_header_line [boolean]
+
+仅在文件格式为 csv 时可以选择配置。
+是否使用标题行来解析文件, 标题行 与 RFC 4180 匹配
+
### file_filter_pattern [string]
过滤模式,用于过滤文件。
diff --git a/docs/zh/connector-v2/source/FtpFile.md
b/docs/zh/connector-v2/source/FtpFile.md
index 6aa1b9178d..c44a12c38c 100644
--- a/docs/zh/connector-v2/source/FtpFile.md
+++ b/docs/zh/connector-v2/source/FtpFile.md
@@ -39,32 +39,33 @@ import ChangeLog from '../changelog/connector-file-ftp.md';
## 配置项
-| 名称 | 类型 | 是否必填 | 默认值 |
-|---------------------------|---------|----------|---------------------|
-| host | string | 是 | - |
-| port | int | 是 | - |
-| user | string | 是 | - |
-| password | string | 是 | - |
-| path | string | 是 | - |
-| file_format_type | string | 是 | - |
-| connection_mode | string | 否 | active_local |
-| delimiter/field_delimiter | string | 否 | \001 |
-| read_columns | list | 否 | - |
-| parse_partition_from_path | boolean | 否 | true |
-| date_format | string | 否 | yyyy-MM-dd |
-| datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss |
-| time_format | string | 否 | HH:mm:ss |
-| skip_header_row_number | long | 否 | 0 |
-| schema | config | 否 | - |
-| sheet_name | string | 否 | - |
-| xml_row_tag | string | 否 | - |
-| xml_use_attr_format | boolean | 否 | - |
-| file_filter_pattern | string | 否 | - |
-| compress_codec | string | 否 | none |
-| archive_compress_codec | string | 否 | none |
-| encoding | string | 否 | UTF-8 |
-| null_format | string | 否 | - |
-| common-options | | 否 | - |
+| 名称 | 类型 | 是否必填 | 默认值 |
+|---------------------------|---------|-------|---------------------|
+| host | string | 是 | - |
+| port | int | 是 | - |
+| user | string | 是 | - |
+| password | string | 是 | - |
+| path | string | 是 | - |
+| file_format_type | string | 是 | - |
+| connection_mode | string | 否 | active_local |
+| delimiter/field_delimiter | string | 否 | \001 |
+| read_columns | list | 否 | - |
+| parse_partition_from_path | boolean | 否 | true |
+| date_format | string | 否 | yyyy-MM-dd |
+| datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss |
+| time_format | string | 否 | HH:mm:ss |
+| skip_header_row_number | long | 否 | 0 |
+| schema | config | 否 | - |
+| sheet_name | string | 否 | - |
+| xml_row_tag | string | 否 | - |
+| xml_use_attr_format | boolean | 否 | - |
+| csv_use_header_line | boolean | 否 | false |
+| file_filter_pattern | string | 否 | - |
+| compress_codec | string | 否 | none |
+| archive_compress_codec | string | 否 | none |
+| encoding | string | 否 | UTF-8 |
+| null_format | string | 否 | - |
+| common-options | | 否 | - |
### host [string]
@@ -313,6 +314,11 @@ SeaTunnel 将从源文件中跳过前 2 行。
指定是否使用标签属性格式处理数据。
+### csv_use_header_line [boolean]
+
+仅在文件格式为 csv 时可以选择配置。
+是否使用标题行来解析文件, 标题行 与 RFC 4180 匹配
+
### compress_codec [string]
文件的压缩编解码器,支持的详细信息如下:
diff --git a/docs/zh/connector-v2/source/HdfsFile.md
b/docs/zh/connector-v2/source/HdfsFile.md
index cf9c7c3687..4fa6993748 100644
--- a/docs/zh/connector-v2/source/HdfsFile.md
+++ b/docs/zh/connector-v2/source/HdfsFile.md
@@ -64,6 +64,8 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
| sheet_name | string | 否 | - |
读取工作簿的表格,仅在文件格式为 excel 时使用。
|
| compress_codec | string | 否 | none | 文件的压缩编解码器。
|
| common-options | | 否 | - | 源插件通用参数,请参阅
[源通用选项](../../../en/connector-v2/source-common-options.md) 获取详细信息。
|
+| csv_use_header_line | boolean | 否 | false |
是否使用标题行来解析文件,仅当 file_format 为 `csv` 且文件包含与 RFC 4180 匹配的标题行时使用
|
+
### delimiter/field_delimiter [string]
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
index 3474538f74..b656f555e0 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
@@ -129,6 +129,13 @@ public class BaseSourceConfigOptions {
.defaultValue(0L)
.withDescription("The number of rows to skip");
+ public static final Option<Boolean> CSV_USE_HEADER_LINE =
+ Options.key("csv_use_header_line")
+ .booleanType()
+ .defaultValue(Boolean.FALSE)
+ .withDescription(
+ "whether to use the header line to parse the file,
only used when the file_format is csv");
+
public static final Option<List<String>> READ_PARTITIONS =
Options.key("read_partitions")
.listType()
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
index cc5cb8820e..4cc7fb2628 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
@@ -51,8 +51,10 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
@Slf4j
public class CsvReadStrategy extends AbstractReadStrategy {
@@ -67,6 +69,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
private int[] indexes;
private String encoding = BaseSourceConfigOptions.ENCODING.defaultValue();
private CatalogTable inputCatalogTable;
+ private boolean firstLineAsHeader =
BaseSourceConfigOptions.CSV_USE_HEADER_LINE.defaultValue();
@Override
public void read(String path, String tableId, Collector<SeaTunnelRow>
output)
@@ -102,9 +105,19 @@ public class CsvReadStrategy extends AbstractReadStrategy {
}
CSVFormat csvFormat = CSVFormat.DEFAULT;
+ if (firstLineAsHeader) {
+ csvFormat = csvFormat.withFirstRecordAsHeader();
+ }
try (BufferedReader reader =
new BufferedReader(new
InputStreamReader(actualInputStream, encoding));
CSVParser csvParser = new CSVParser(reader, csvFormat); ) {
+ // test and skip `\uFEFF` BOM
+ reader.mark(1);
+ int firstChar = reader.read();
+ if (firstChar != 0xFEFF) {
+ reader.reset();
+ }
+ // skip lines
for (int i = 0; i < skipHeaderNumber; i++) {
if (reader.readLine() == null) {
throw new IOException(
@@ -114,10 +127,18 @@ public class CsvReadStrategy extends AbstractReadStrategy
{
}
}
// read lines
+ List<String> headers = getHeaders(csvParser);
for (CSVRecord csvRecord : csvParser) {
HashMap<Integer, String> fieldIdValueMap = new HashMap<>();
- for (int i = 0; i <
inputCatalogTable.getTableSchema().getColumns().size(); i++) {
- fieldIdValueMap.put(i, csvRecord.get(i));
+ for (int i = 0; i < headers.size(); i++) {
+ // the user input schema may not contain all the columns
in the csv header
+ // and may contain columns in a different order with the
csv header
+ int index =
+
inputCatalogTable.getSeaTunnelRowType().indexOf(headers.get(i), false);
+ if (index == -1) {
+ continue;
+ }
+ fieldIdValueMap.put(index, csvRecord.get(i));
}
SeaTunnelRow seaTunnelRow =
deserializationSchema.getSeaTunnelRow(fieldIdValueMap);
if (!readColumns.isEmpty()) {
@@ -152,6 +173,19 @@ public class CsvReadStrategy extends AbstractReadStrategy {
}
}
+ private List<String> getHeaders(CSVParser csvParser) {
+ List<String> headers;
+ if (firstLineAsHeader) {
+ headers =
csvParser.getHeaderNames().stream().collect(Collectors.toList());
+ } else {
+ headers =
+ inputCatalogTable.getTableSchema().getColumns().stream()
+ .map(column -> column.getName())
+ .collect(Collectors.toList());
+ }
+ return headers;
+ }
+
@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
@@ -205,6 +239,10 @@ public class CsvReadStrategy extends AbstractReadStrategy {
readonlyConfig
.getOptional(BaseSourceConfigOptions.NULL_FORMAT)
.orElse(null));
+ if
(pluginConfig.hasPath(BaseSourceConfigOptions.CSV_USE_HEADER_LINE.key())) {
+ firstLineAsHeader =
+
pluginConfig.getBoolean(BaseSourceConfigOptions.CSV_USE_HEADER_LINE.key());
+ }
if (isMergePartition) {
deserializationSchema =
builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index fc69268584..63b94d82f6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -290,6 +290,14 @@ public class LocalFileIT extends TestSuiteBase {
"/csv/break_line.csv",
"/seatunnel/read/csv/break_line/break_line.csv",
container);
+ ContainerUtil.copyFileIntoContainers(
+ "/csv/csv_with_header1.csv",
+ "/seatunnel/read/csv/header/csv_with_header1.csv",
+ container);
+ ContainerUtil.copyFileIntoContainers(
+ "/csv/csv_with_header2.csv",
+ "/seatunnel/read/csv/header/csv_with_header2.csv",
+ container);
ContainerUtil.copyFileIntoContainers(
"/text/e2e_null_format.txt",
@@ -305,6 +313,7 @@ public class LocalFileIT extends TestSuiteBase {
TestHelper helper = new TestHelper(container);
helper.execute("/csv/fake_to_local_csv.conf");
helper.execute("/csv/local_csv_to_assert.conf");
+ helper.execute("/csv/csv_with_header_to_assert.conf");
helper.execute("/csv/breakline_csv_to_assert.conf");
helper.execute("/excel/fake_to_local_excel.conf");
helper.execute("/excel/local_excel_to_assert.conf");
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header1.csv
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header1.csv
new file mode 100644
index 0000000000..25b892a001
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header1.csv
@@ -0,0 +1,2 @@
+name,id,is_female
+tom,20,true
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header2.csv
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header2.csv
new file mode 100644
index 0000000000..63c1d91e60
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header2.csv
@@ -0,0 +1,2 @@
+name,is_female,id
+tommy,false,30
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header_to_assert.conf
new file mode 100644
index 0000000000..b0aaa6d6b2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header_to_assert.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/csv/header"
+ file_format_type = csv
+ csv_use_header_line = true
+ schema = {
+ fields {
+ id = int
+ name = string
+ is_female = boolean
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 2
+ }
+ {
+ rule_type = MIN_ROW
+ rule_value = 2
+ }
+ ]
+ field_rules = [
+ {
+ field_name = id
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ {
+ field_name = is_female
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}