This is an automated email from the ASF dual-hosted git repository.
davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b0b095eb4e [Fix][Connector-file] Fix the new schema cannot be fetched
when the parquet file is read (#10378)
b0b095eb4e is described below
commit b0b095eb4e8ece15099ea25a9da8ef3cc020e2ac
Author: JeremyXin <[email protected]>
AuthorDate: Thu Jun 11 21:20:59 2026 +0800
[Fix][Connector-file] Fix the new schema cannot be fetched when the parquet
file is read (#10378)
Co-authored-by: JeremyXin <[email protected]>
---
docs/en/connectors/source/CosFile.md | 9 +
docs/en/connectors/source/FtpFile.md | 10 +
docs/en/connectors/source/HdfsFile.md | 9 +
docs/en/connectors/source/LocalFile.md | 9 +
docs/en/connectors/source/ObsFile.md | 9 +
docs/en/connectors/source/OssFile.md | 9 +
docs/en/connectors/source/OssJindoFile.md | 9 +
docs/en/connectors/source/S3File.md | 9 +
docs/en/connectors/source/SftpFile.md | 9 +
docs/zh/connectors/source/CosFile.md | 9 +
docs/zh/connectors/source/FtpFile.md | 10 +
docs/zh/connectors/source/HdfsFile.md | 9 +
docs/zh/connectors/source/LocalFile.md | 10 +
docs/zh/connectors/source/ObsFile.md | 8 +
docs/zh/connectors/source/OssFile.md | 9 +
docs/zh/connectors/source/OssJindoFile.md | 9 +
docs/zh/connectors/source/S3File.md | 9 +
docs/zh/connectors/source/SftpFile.md | 9 +
.../file/config/FileBaseSourceOptions.java | 9 +
.../connectors/seatunnel/file/config/FileInfo.java | 53 +++++
.../file/source/reader/AbstractReadStrategy.java | 38 +++-
.../file/source/reader/ParquetReadStrategy.java | 24 ++-
.../source/reader/AbstractReadStrategyTest.java | 6 +-
.../file/source/reader/CsvReadStrategyTest.java | 33 +--
.../reader}/OrcReadStrategyTest.java | 33 +--
.../reader}/ParquetReadStrategyTest.java | 232 +++++++++++++++++----
.../reader}/ReadStrategyEncodingTest.java | 30 +--
.../file/source/reader/UpdateSyncModeTest.java | 29 +--
.../reader}/XmlReadStrategyTest.java | 27 +--
.../seatunnel/file/util/LocalFileSystemConf.java | 42 ++++
.../file/writer/CsvWriteStrategyTest.java | 9 +-
.../seatunnel/file/writer/FileSinkConfigTest.java | 6 +-
.../file/writer/OrcWriteStrategyTest.java | 5 +-
.../file/writer/ParquetTypeCoercionTest.java | 5 +-
.../writer/ParquetWriteStrategyEvolutionTest.java | 9 +-
.../file/writer/ParquetWriteStrategyTest.java | 5 +-
36 files changed, 564 insertions(+), 195 deletions(-)
diff --git a/docs/en/connectors/source/CosFile.md
b/docs/en/connectors/source/CosFile.md
index 14097b4390..4c71220b22 100644
--- a/docs/en/connectors/source/CosFile.md
+++ b/docs/en/connectors/source/CosFile.md
@@ -85,6 +85,7 @@ To use this connector you need put
hadoop-cos-{hadoop.version}-{version}.jar and
| file_filter_modified_end | string | no | -
|
| quote_char | string | no | "
|
| escape_char | string | no | -
|
+| sort_files_by_modification_time | boolean | no | false
|
### path [string]
@@ -436,6 +437,14 @@ A single character that encloses CSV fields, allowing
fields with commas, line b
A single character that allows the quote or other special characters to appear
inside a CSV field without ending the field.
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is
`false`.
+
+When enabled, files will be sorted by their modification time (newest first).
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the
latest file
+- You need to process files in chronological order
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](../common-options/source-common-options.md) for details.
diff --git a/docs/en/connectors/source/FtpFile.md
b/docs/en/connectors/source/FtpFile.md
index d38c18b666..a04def1680 100644
--- a/docs/en/connectors/source/FtpFile.md
+++ b/docs/en/connectors/source/FtpFile.md
@@ -89,6 +89,8 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
| file_filter_modified_end | string | no | -
|
| quote_char | string | no | "
|
| escape_char | string | no | -
|
+| sort_files_by_modification_time | boolean | no | false
|
+| metalake_type | string | no | gravitino
|
### host [string]
@@ -537,6 +539,14 @@ A single character that encloses CSV fields, allowing
fields with commas, line b
A single character that allows the quote or other special characters to appear
inside a CSV field without ending the field.
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is
`false`.
+
+When enabled, files will be sorted by their modification time (newest first).
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the
latest file
+- You need to process files in chronological order
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](../common-options/source-common-options.md) for details.
diff --git a/docs/en/connectors/source/HdfsFile.md
b/docs/en/connectors/source/HdfsFile.md
index 9a4835af34..8124c1a6c6 100644
--- a/docs/en/connectors/source/HdfsFile.md
+++ b/docs/en/connectors/source/HdfsFile.md
@@ -95,6 +95,7 @@ Read data from hdfs file system.
| file_split_size | long | no | 134217728
| Split size in bytes when `enable_file_split=true`. For `text`/`csv`/`json`,
the split end will be aligned to the next `row_delimiter`. For `parquet`, the
split unit is RowGroup and will never break a RowGroup.
|
| quote_char | string | no | "
| A single character that encloses CSV fields, allowing fields with commas,
line breaks, or quotes to be read correctly.
|
| escape_char | string | no | -
| A single character that allows the quote or other special characters to
appear inside a CSV field without ending the field.
|
+| sort_files_by_modification_time | boolean | no | false
| Sort files by modification time in descending order. Enable this when
reading evolving schemas to ensure schema inference uses the latest file.
|
### file_format_type [string]
@@ -320,6 +321,14 @@ A single character that encloses CSV fields, allowing
fields with commas, line b
A single character that allows the quote or other special characters to appear
inside a CSV field without ending the field.
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is
`false`.
+
+When enabled, files will be sorted by their modification time (newest first).
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the
latest file
+- You need to process files in chronological order
+
### Tips
> If you use spark/flink, In order to use this connector, You must ensure your
> spark/flink cluster already integrated hadoop. The tested hadoop version is
> 2.x. If you use SeaTunnel Engine, It automatically integrated the hadoop jar
> when you download and install SeaTunnel Engine. You can check the jar
> package under ${SEATUNNEL_HOME}/lib to confirm this.
diff --git a/docs/en/connectors/source/LocalFile.md
b/docs/en/connectors/source/LocalFile.md
index 33e4085509..44b208b8ac 100644
--- a/docs/en/connectors/source/LocalFile.md
+++ b/docs/en/connectors/source/LocalFile.md
@@ -92,6 +92,7 @@ If you use SeaTunnel Engine, It automatically integrated the
hadoop jar when you
| file_split_size | long | no | 134217728
|
| quote_char | string | no | "
|
| escape_char | string | no | -
|
+| sort_files_by_modification_time | boolean | no | false
| Sort files by modification time in descending order. Enable this when
reading evolving schemas to ensure schema inference uses the latest file.
|
### path [string]
The source file path.
@@ -534,6 +535,14 @@ A single character that encloses CSV fields, allowing
fields with commas, line b
A single character that allows the quote or other special characters to appear
inside a CSV field without ending the field.
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is
`false`.
+
+When enabled, files will be sorted by their modification time (newest first).
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the
latest file
+- You need to process files in chronological order
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](../common-options/source-common-options.md) for details
diff --git a/docs/en/connectors/source/ObsFile.md
b/docs/en/connectors/source/ObsFile.md
index 824c63f07c..4ecc0d5a21 100644
--- a/docs/en/connectors/source/ObsFile.md
+++ b/docs/en/connectors/source/ObsFile.md
@@ -86,6 +86,7 @@ It only supports hadoop version **2.9.X+**.
| file_filter_modified_end | string | no | - | File
modification time filter. The connector will filter some files base on the last
modification end time (not include end time). The default data format is
`yyyy-MM-dd HH:mm:ss`. |
| quote_char | string | no | " | A
single character that encloses CSV fields, allowing fields with commas, line
breaks, or quotes to be read correctly.
|
| escape_char | string | no | - | A
single character that allows the quote or other special characters to appear
inside a CSV field without ending the field.
|
+| sort_files_by_modification_time | boolean | no | false |
Sort files by modification time in descending order. Enable this when reading
evolving schemas to ensure schema inference uses the latest file.
|
### Tips
@@ -273,6 +274,14 @@ schema {
> Source plugin common parameters, please refer to [Source Common
> Options](../common-options/source-common-options.md) for details.
+#### <span id="sort_files_by_modification_time">
sort_files_by_modification_time </span>
+
+> Whether to sort files by modification time in descending order. Default is
`false`.
+>
+> When enabled, files will be sorted by their modification time (newest
first). This is useful when:
+> - Reading files with evolving schemas and you want schema inference to use
the latest file
+> - You need to process files in chronological order
+
## Task Example
### text file
diff --git a/docs/en/connectors/source/OssFile.md
b/docs/en/connectors/source/OssFile.md
index 2ed32e9ca9..8636bce177 100644
--- a/docs/en/connectors/source/OssFile.md
+++ b/docs/en/connectors/source/OssFile.md
@@ -217,6 +217,7 @@ If you assign file type to `parquet` `orc`, schema option
not required, connecto
| file_filter_modified_end | string | no | - | File
modification time filter. The connector will filter some files base on the last
modification end time (not include end time). The default data format is
`yyyy-MM-dd HH:mm:ss`.
|
| quote_char | string | no | " | A
single character that encloses CSV fields, allowing fields with commas, line
breaks, or quotes to be read correctly.
|
| escape_char | string | no | - | A
single character that allows the quote or other special characters to appear
inside a CSV field without ending the field.
|
+| sort_files_by_modification_time | boolean | no | false |
Sort files by modification time in descending order. Enable this when reading
evolving schemas to ensure schema inference uses the latest file.
|
### file_format_type [string]
@@ -282,6 +283,14 @@ A single character that encloses CSV fields, allowing
fields with commas, line b
A single character that allows the quote or other special characters to appear
inside a CSV field without ending the field.
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is
`false`.
+
+When enabled, files will be sorted by their modification time (newest first).
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the
latest file
+- You need to process files in chronological order
+
### file_filter_pattern [string]
Filter pattern, which used for filtering files. If you only want to filter
based on file names, simply write the regular file names; If you want to filter
based on the file directory at the same time, the expression needs to start
with `path`.
diff --git a/docs/en/connectors/source/OssJindoFile.md
b/docs/en/connectors/source/OssJindoFile.md
index e3b9867f23..6cadda858c 100644
--- a/docs/en/connectors/source/OssJindoFile.md
+++ b/docs/en/connectors/source/OssJindoFile.md
@@ -87,6 +87,7 @@ It only supports hadoop version **2.9.X+**.
| file_filter_modified_end | string | no | -
|
| quote_char | string | no | "
|
| escape_char | string | no | -
|
+| sort_files_by_modification_time | boolean | no | false
|
### path [string]
@@ -417,6 +418,14 @@ A single character that encloses CSV fields, allowing
fields with commas, line b
A single character that allows the quote or other special characters to appear
inside a CSV field without ending the field.
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is
`false`.
+
+When enabled, files will be sorted by their modification time (newest first).
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the
latest file
+- You need to process files in chronological order
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](../common-options/source-common-options.md) for details.
diff --git a/docs/en/connectors/source/S3File.md
b/docs/en/connectors/source/S3File.md
index 61275f37c1..b4ccfda5e9 100644
--- a/docs/en/connectors/source/S3File.md
+++ b/docs/en/connectors/source/S3File.md
@@ -226,6 +226,7 @@ If you assign file type to `parquet` `orc`, schema option
not required, connecto
| common-options | | no | -
| Source plugin common parameters, please refer
to [Source Common Options](../common-options/source-common-options.md) for
details.
[...]
| quote_char | string | no | "
| A single character that encloses CSV fields,
allowing fields with commas, line breaks, or quotes to be read correctly.
[...]
| escape_char | string | no | -
| A single character that allows the quote or
other special characters to appear inside a CSV field without ending the field.
[...]
+| sort_files_by_modification_time | boolean | no | false
| Sort files by modification time in descending
order. Enable this when reading evolving schemas to ensure schema inference
uses the latest file.
|
### file_format_type [string]
@@ -394,6 +395,14 @@ A single character that encloses CSV fields, allowing
fields with commas, line b
A single character that allows the quote or other special characters to appear
inside a CSV field without ending the field.
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is
`false`.
+
+When enabled, files will be sorted by their modification time (newest first).
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the
latest file
+- You need to process files in chronological order
+
### schema [config]
#### fields [Config]
diff --git a/docs/en/connectors/source/SftpFile.md
b/docs/en/connectors/source/SftpFile.md
index b819cb3ce5..f32adda510 100644
--- a/docs/en/connectors/source/SftpFile.md
+++ b/docs/en/connectors/source/SftpFile.md
@@ -120,6 +120,7 @@ The File does not have a specific type list, and we can
indicate which SeaTunnel
| file_filter_modified_end | string | no | -
| File modification time filter. The connector will filter some files base
on the last modification end time (not include end time). The default data
format is `yyyy-MM-dd HH:mm:ss`.
|
| quote_char | string | no | "
| A single character that encloses CSV fields, allowing fields with commas,
line breaks, or quotes to be read correctly.
|
| escape_char | string | no | -
| A single character that allows the quote or other special characters to
appear inside a CSV field without ending the field.
|
+| sort_files_by_modification_time | boolean | no | false
| Sort files by modification time in descending order. Enable this when
reading evolving schemas to ensure schema inference uses the latest file.
|
### file_filter_pattern [string]
@@ -377,6 +378,14 @@ A single character that encloses CSV fields, allowing
fields with commas, line b
A single character that allows the quote or other special characters to appear
inside a CSV field without ending the field.
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is
`false`.
+
+When enabled, files will be sorted by their modification time (newest first).
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the
latest file
+- You need to process files in chronological order
+
### schema [config]
#### fields [Config]
diff --git a/docs/zh/connectors/source/CosFile.md
b/docs/zh/connectors/source/CosFile.md
index 5a2e8943bc..f9622a1ab3 100644
--- a/docs/zh/connectors/source/CosFile.md
+++ b/docs/zh/connectors/source/CosFile.md
@@ -84,6 +84,7 @@ import ChangeLog from '../changelog/connector-file-cos.md';
| file_filter_modified_end | string | 否 | - |
| quote_char | string | 否 | " |
| escape_char | string | 否 | - |
+| sort_files_by_modification_time | boolean | 否 | false |
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。
|
### path [string]
@@ -430,6 +431,14 @@ abc.*
用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
### common options
源插件常用参数,详见[源端通用选项](../common-options/source-common-options.md)。
diff --git a/docs/zh/connectors/source/FtpFile.md
b/docs/zh/connectors/source/FtpFile.md
index 64666a8c85..00cffcdfd4 100644
--- a/docs/zh/connectors/source/FtpFile.md
+++ b/docs/zh/connectors/source/FtpFile.md
@@ -85,6 +85,8 @@ import ChangeLog from '../changelog/connector-file-ftp.md';
| file_filter_modified_end | string | 否 | - |
| quote_char | string | 否 | " |
| escape_char | string | 否 | - |
+| sort_files_by_modification_time | boolean | 否 | false |
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。
|
+
### host [string]
目标 FTP 主机地址,必填项。
@@ -506,6 +508,14 @@ compare_mode = "len_mtime"
用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
### 通用选项
源插件的通用参数,详情请参考 [源通用选项](../common-options/source-common-options.md)。
diff --git a/docs/zh/connectors/source/HdfsFile.md
b/docs/zh/connectors/source/HdfsFile.md
index c2c649ffc9..e002cafec6 100644
--- a/docs/zh/connectors/source/HdfsFile.md
+++ b/docs/zh/connectors/source/HdfsFile.md
@@ -95,6 +95,7 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
| file_split_size | long | 否 | 134217728 |
`enable_file_split=true` 时生效,单位字节。`text`/`csv`/`json` 按 `file_split_size`
拆分并对齐到下一个 `row_delimiter`;`parquet` 以 RowGroup 为拆分单位,不会切开 RowGroup。
|
| quote_char | string | 否 | " | 用于包裹 CSV
字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。
|
| escape_char | string | 否 | - | 用于在 CSV
字段内转义引号或其他特殊字符,使其不会结束字段。
|
+| sort_files_by_modification_time | boolean | 否 | false |
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。
|
### file_format_type [string]
@@ -321,6 +322,14 @@ abc.*
用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
### schema [config]
仅在文件格式类型为 text、json、excel、xml 或 csv(或其他无法从元数据中读取 schema 的格式)时需要配置。
diff --git a/docs/zh/connectors/source/LocalFile.md
b/docs/zh/connectors/source/LocalFile.md
index 14f1b074be..323171bbde 100644
--- a/docs/zh/connectors/source/LocalFile.md
+++ b/docs/zh/connectors/source/LocalFile.md
@@ -92,6 +92,8 @@ import ChangeLog from '../changelog/connector-file-local.md';
| file_split_size | long | 否 | 134217728 |
| quote_char | string | 否 | - |
| escape_char | string | 否 | - |
+| sort_files_by_modification_time | boolean | 否 | false |
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。
|
+
### path [string]
源文件路径。
@@ -534,6 +536,14 @@ compare_mode = "len_mtime"
用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
### 通用选项
数据源插件通用参数,请参阅 [数据源通用选项](../common-options/source-common-options.md) 了解详情
diff --git a/docs/zh/connectors/source/ObsFile.md
b/docs/zh/connectors/source/ObsFile.md
index 6349284912..4a44ea4868 100644
--- a/docs/zh/connectors/source/ObsFile.md
+++ b/docs/zh/connectors/source/ObsFile.md
@@ -80,6 +80,7 @@ import ChangeLog from '../changelog/connector-file-obs.md';
| time_format | string | 否 | HH:mm:ss | 时间类型格式
|
| quote_char | string | 否 | " | 用于包裹 CSV
字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。 |
| escape_char | string | 否 | - | 用于在 CSV
字段内转义引号或其他特殊字符,使其不会结束字段。 |
+| sort_files_by_modification_time | boolean | 否 | false |
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。
|
### file_format_type [string]
@@ -110,6 +111,13 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
注意:Markdown 格式仅支持读取,不支持写入。
+### sort_files_by_modification_time [boolean]
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
## 变更日志
<ChangeLog />
diff --git a/docs/zh/connectors/source/OssFile.md
b/docs/zh/connectors/source/OssFile.md
index aa583cb0e5..67cc8a2904 100644
--- a/docs/zh/connectors/source/OssFile.md
+++ b/docs/zh/connectors/source/OssFile.md
@@ -216,6 +216,7 @@ schema {
| file_filter_modified_end | string | 否 | - |
按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`
|
| quote_char | string | 否 | " | 用于包裹 CSV
字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。
|
| escape_char | string | 否 | - | 用于在 CSV
字段内转义引号或其他特殊字符,使其不会结束字段。
|
+| sort_files_by_modification_time | boolean | 否 | false |
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。
|
### compress_codec [string]
@@ -281,6 +282,14 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
### file_filter_pattern [string]
文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。
diff --git a/docs/zh/connectors/source/OssJindoFile.md
b/docs/zh/connectors/source/OssJindoFile.md
index 096488d268..7e68d61b21 100644
--- a/docs/zh/connectors/source/OssJindoFile.md
+++ b/docs/zh/connectors/source/OssJindoFile.md
@@ -80,6 +80,15 @@ import ChangeLog from
'../changelog/connector-file-oss-jindo.md';
| file_filter_pattern | string | 否 | - |
文件过滤模式 |
| quote_char | string | 否 | " |
用于包裹 CSV 字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。 |
| escape_char | string | 否 | - | 用于在
CSV 字段内转义引号或其他特殊字符,使其不会结束字段。 |
+| sort_files_by_modification_time | boolean | 否 | false |
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。
|
+
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
### file_format_type [string]
diff --git a/docs/zh/connectors/source/S3File.md
b/docs/zh/connectors/source/S3File.md
index 6fae586557..6af96d6202 100644
--- a/docs/zh/connectors/source/S3File.md
+++ b/docs/zh/connectors/source/S3File.md
@@ -225,6 +225,7 @@ schema {
| common-options | | 否 | -
|
数据源插件通用参数,请参考[数据源通用选项](../common-options/source-common-options.md)了解详情。
|
| quote_char | string | 否 | "
| 用于包裹 CSV 字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。
|
| escape_char | string | 否 | -
| 用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。
|
+| sort_files_by_modification_time | boolean | 否 | false |
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。
|
### delimiter/field_delimiter [string]
@@ -246,6 +247,14 @@ schema {
用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
### file_filter_pattern [string]
文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。
diff --git a/docs/zh/connectors/source/SftpFile.md
b/docs/zh/connectors/source/SftpFile.md
index 9112f06d34..c225af5a58 100644
--- a/docs/zh/connectors/source/SftpFile.md
+++ b/docs/zh/connectors/source/SftpFile.md
@@ -120,6 +120,7 @@ import ChangeLog from '../changelog/connector-file-sftp.md';
| file_filter_modified_end | string | 否 | - |
按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`
|
| quote_char | string | 否 | " | 用于包裹 CSV
字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。
|
| escape_char | string | 否 | - | 用于在 CSV
字段内转义引号或其他特殊字符,使其不会结束字段。
|
+| sort_files_by_modification_time | boolean | 否 | false |
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。
|
### file_filter_pattern [string]
@@ -384,6 +385,14 @@ compare_mode = "len_mtime"
更多信息请参考 [元数据 SPI](../../introduction/concepts/metadata-spi.md)。
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
## 如何创建Sftp数据同步作业
以下示例演示如何创建从sftp读取数据并在本地客户端打印的数据同步作业:
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
index 62590e5aa5..99ec07fb51 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
@@ -224,4 +224,13 @@ public class FileBaseSourceOptions extends FileBaseOptions
{
.noDefaultValue()
.withDescription(
"A single character that allows the quote or other
special characters to appear inside a CSV field without ending the field.");
+
+ public static final Option<Boolean> SORT_FILES_BY_MOD_TIME =
+ Options.key("sort_files_by_modification_time")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Sort files by modification time in descending
order. "
+ + "Enable this when reading evolving
schemas to ensure schema inference uses the latest file. "
+ + "Disabled by default to avoid
performance overhead for large file directories.");
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileInfo.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileInfo.java
new file mode 100644
index 0000000000..c7c4be78ae
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.config;
+
+import lombok.Getter;
+
+import java.util.Objects;
+
+/**
+ * Represents a file with its metadata information for schema inference.
+ *
+ * <p>This class is used to track file information during file listing
operations, particularly to
+ * support sorting by modification time.
+ */
+@Getter
+public class FileInfo {
+
+ private final String fileName;
+ private final long modifyTime;
+
+ public FileInfo(String fileName, long modifyTime) {
+ this.fileName = fileName;
+ this.modifyTime = modifyTime;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ FileInfo fileInfo = (FileInfo) o;
+ return modifyTime == fileInfo.modifyTime && Objects.equals(fileName,
fileInfo.fileName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fileName, modifyTime);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 79bc6badf3..a685a69fec 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -35,6 +35,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressForm
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileCompareMode;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileInfo;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileUpdateStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
@@ -66,6 +67,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
@@ -73,6 +75,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@@ -123,6 +126,8 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
protected transient HadoopFileSystemProxy targetHadoopFileSystemProxy;
protected transient boolean shareTargetFileSystemProxy;
protected transient boolean checksumUnavailableWarned;
+ protected boolean sortFilesByModTime =
+ FileBaseSourceOptions.SORT_FILES_BY_MOD_TIME.defaultValue();
private static final class UpdateModeStats {
private long scanned;
@@ -151,9 +156,15 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
@Override
public List<String> getFileNamesByPath(String path) throws IOException {
- ArrayList<String> fileNames = new ArrayList<>();
+ List<FileInfo> fileInfoList = new ArrayList<>();
UpdateModeStats updateModeStats = enableUpdateSync ? new
UpdateModeStats() : null;
- collectFileNamesByPath(path, fileNames, updateModeStats);
+ collectFileInfoByPath(path, fileInfoList, updateModeStats);
+
+ // Sort by modification time in descending order if enabled
+ if (sortFilesByModTime) {
+
fileInfoList.sort(Comparator.comparingLong(FileInfo::getModifyTime).reversed());
+ }
+
if (updateModeStats != null) {
log.info(
"Update sync mode statistics: scanned={}, skipped={},
to_sync={}",
@@ -161,19 +172,24 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
updateModeStats.skipped,
updateModeStats.scanned - updateModeStats.skipped);
}
- return fileNames;
+
+ for (FileInfo fileInfo : fileInfoList) {
+ this.fileNames.add(fileInfo.getFileName());
+ }
+
+ return
fileInfoList.stream().map(FileInfo::getFileName).collect(Collectors.toList());
}
- private void collectFileNamesByPath(
- String path, List<String> fileNames, UpdateModeStats
updateModeStats)
+ private void collectFileInfoByPath(
+ String path, List<FileInfo> fileInfoList, UpdateModeStats
updateModeStats)
throws IOException {
FileStatus[] stats = hadoopFileSystemProxy.listStatus(path);
for (FileStatus fileStatus : stats) {
if (fileStatus.isDirectory()) {
// skip hidden tmp directory, such as .hive-staging_hive
if (!fileStatus.getPath().getName().startsWith(".")) {
- collectFileNamesByPath(
- fileStatus.getPath().toString(), fileNames,
updateModeStats);
+ collectFileInfoByPath(
+ fileStatus.getPath().toString(), fileInfoList,
updateModeStats);
}
continue;
}
@@ -214,8 +230,8 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
updateModeStats.scanned++;
}
if (shouldSyncFileInUpdateMode(fileStatus)) {
- fileNames.add(filePath);
- this.fileNames.add(filePath);
+ FileInfo fileInfo = new FileInfo(filePath,
fileStatus.getModificationTime());
+ fileInfoList.add(fileInfo);
} else if (updateModeStats != null) {
updateModeStats.skipped++;
}
@@ -319,6 +335,10 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
enableSplitFile =
pluginConfig.getBoolean(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key());
}
+ if
(pluginConfig.hasPath(FileBaseSourceOptions.SORT_FILES_BY_MOD_TIME.key())) {
+ sortFilesByModTime =
+
pluginConfig.getBoolean(FileBaseSourceOptions.SORT_FILES_BY_MOD_TIME.key());
+ }
if (pluginConfig.hasPath(FileBaseSourceOptions.FILE_PATH.key())
&&
pluginConfig.getValue(FileBaseSourceOptions.FILE_PATH.key()).valueType()
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index d444dc84f7..de2bc75d69 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -91,8 +91,6 @@ public class ParquetReadStrategy extends AbstractReadStrategy
{
private static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
private static final String PARQUET = "Parquet";
- private int[] indexes;
-
@Override
public void read(String path, String tableId, Collector<SeaTunnelRow>
output)
throws FileConnectorException, IOException {
@@ -163,7 +161,8 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
fields = new Object[fieldsCount];
}
for (int i = 0; i < fieldsCount; i++) {
- Object data = record.get(indexes[i]);
+ String fieldName = seaTunnelRowType.getFieldName(i);
+ Object data = record.hasField(fieldName) ?
record.get(fieldName) : null;
fields[i] = resolveObject(data,
seaTunnelRowType.getFieldType(i));
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
@@ -198,6 +197,7 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
try (ParquetReader<Group> closeableReader = reader) {
Group record;
while ((record = closeableReader.read()) != null) {
+ GroupType recordType = record.getType();
Object[] fields;
if (isMergePartition) {
int index = fieldsCount;
@@ -209,11 +209,17 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
fields = new Object[fieldsCount];
}
for (int i = 0; i < fieldsCount; i++) {
+ String fieldName = seaTunnelRowType.getFieldName(i);
+ if (!recordType.containsField(fieldName)) {
+ fields[i] = null;
+ continue;
+ }
+ int fieldIndex = recordType.getFieldIndex(fieldName);
fields[i] =
resolveGroupObject(
record,
- record.getType().getType(indexes[i]),
- indexes[i],
+ recordType.getType(fieldIndex),
+ fieldIndex,
seaTunnelRowType.getFieldType(i));
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
@@ -765,15 +771,12 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
}
String[] fields = new String[readColumns.size()];
SeaTunnelDataType<?>[] types = new
SeaTunnelDataType[readColumns.size()];
- indexes = new int[readColumns.size()];
buildColumnsWithErrorCheck(
TablePath.DEFAULT,
IntStream.range(0, readColumns.size()).iterator(),
i -> {
fields[i] = readColumns.get(i);
Type type = originalSchema.getType(fields[i]);
- int fieldIndex = originalSchema.getFieldIndex(fields[i]);
- indexes[i] = fieldIndex;
SeaTunnelDataType<?> configDataType =
getConfigFieldType(configRowType, fields[i]);
types[i] = parquetType2SeaTunnelType(type, configDataType,
fields[i]);
@@ -781,6 +784,11 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
seaTunnelRowType = new SeaTunnelRowType(fields, types);
seaTunnelRowTypeWithPartition = mergePartitionTypes(path,
seaTunnelRowType);
+
+ log.debug(
+ "get seatunnel row type with user config: {}. path: {}",
+ getActualSeaTunnelRowTypeInfo(),
+ path);
return getActualSeaTunnelRowTypeInfo();
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
index 74b4f7abe9..626a33bbd7 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
-import
org.apache.seatunnel.connectors.seatunnel.file.writer.ParquetReadStrategyTest;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
@@ -97,8 +97,8 @@ public class AbstractReadStrategyTest {
public void testReadDirectorySkipHiddenDirectories() throws Exception {
AutoGenerateParquetData.generateTestData();
try (ParquetReadStrategy parquetReadStrategy = new
ParquetReadStrategy(); ) {
- ParquetReadStrategyTest.LocalConf localConf =
- new
ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
List<String> list =
parquetReadStrategy.getFileNamesByPath(AutoGenerateParquetData.DATA_FILE_PATH);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategyTest.java
index 681963468f..0278315adc 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategyTest.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -51,7 +51,8 @@ public class CsvReadStrategyTest {
URL resource = CsvReadStrategyTest.class.getResource("/test.csv");
String path = Paths.get(resource.toURI()).toString();
CsvReadStrategy csvReadStrategy = new CsvReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
csvReadStrategy.init(localConf);
csvReadStrategy.getFileNamesByPath(path);
csvReadStrategy.setPluginConfig(ConfigFactory.empty());
@@ -80,7 +81,8 @@ public class CsvReadStrategyTest {
URL resource = CsvReadStrategyTest.class.getResource("/test-csv.csv");
String path = Paths.get(resource.toURI()).toString();
CsvReadStrategy csvReadStrategy = new CsvReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
csvReadStrategy.init(localConf);
csvReadStrategy.getFileNamesByPath(path);
System.setProperty("field_delimiter", ";");
@@ -112,7 +114,8 @@ public class CsvReadStrategyTest {
CsvReadStrategyTest.class.getResource("/csv/special_quote_char_break_line.csv");
String path = Paths.get(resource.toURI()).toString();
CsvReadStrategy csvReadStrategy = new CsvReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
csvReadStrategy.init(localConf);
csvReadStrategy.getFileNamesByPath(path);
csvReadStrategy.setPluginConfig(ConfigFactory.parseMap(getOptionsForSpecialQuoteChar()));
@@ -170,7 +173,8 @@ public class CsvReadStrategyTest {
String path = Paths.get(resource.toURI()).toString();
TestCollector testCollector;
try (CsvReadStrategy csvReadStrategy = new CsvReadStrategy()) {
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
csvReadStrategy.init(localConf);
csvReadStrategy.getFileNamesByPath(path);
csvReadStrategy.setPluginConfig(ConfigFactory.parseMap(csvBomOptions));
@@ -226,23 +230,4 @@ public class CsvReadStrategyTest {
return null;
}
}
-
- public static class LocalConf extends HadoopConf {
- private static final String HDFS_IMPL =
"org.apache.hadoop.fs.LocalFileSystem";
- private static final String SCHEMA = "file";
-
- public LocalConf(String hdfsNameKey) {
- super(hdfsNameKey);
- }
-
- @Override
- public String getFsHdfsImpl() {
- return HDFS_IMPL;
- }
-
- @Override
- public String getSchema() {
- return SCHEMA;
- }
- }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategyTest.java
similarity index 89%
rename from
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
rename to
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategyTest.java
index b44f2c4c5d..7a4951405c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategyTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.writer;
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -24,8 +24,7 @@ import
org.apache.seatunnel.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
import org.apache.hadoop.conf.Configuration;
import org.apache.orc.OrcFile;
@@ -58,7 +57,8 @@ public class OrcReadStrategyTest {
Assertions.assertNotNull(orcFile);
String orcFilePath = Paths.get(orcFile.toURI()).toString();
OrcReadStrategy orcReadStrategy = new OrcReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
orcReadStrategy.init(localConf);
TestCollector testCollector = new TestCollector();
SeaTunnelRowType seaTunnelRowTypeInfo =
@@ -76,7 +76,8 @@ public class OrcReadStrategyTest {
@Test
public void testReadNotExistedFile() throws Exception {
OrcReadStrategy orcReadStrategy = new OrcReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
orcReadStrategy.init(localConf);
Exception exception =
Assertions.assertThrows(
@@ -94,7 +95,8 @@ public class OrcReadStrategyTest {
String orcFilePath = Paths.get(orcFile.toURI()).toString();
String confPath = Paths.get(conf.toURI()).toString();
OrcReadStrategy orcReadStrategy = new OrcReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
orcReadStrategy.init(localConf);
orcReadStrategy.setPluginConfig(pluginConfig);
@@ -189,23 +191,4 @@ public class OrcReadStrategyTest {
return null;
}
}
-
- public static class LocalConf extends HadoopConf {
- private static final String HDFS_IMPL =
"org.apache.hadoop.fs.LocalFileSystem";
- private static final String SCHEMA = "file";
-
- public LocalConf(String hdfsNameKey) {
- super(hdfsNameKey);
- }
-
- @Override
- public String getFsHdfsImpl() {
- return HDFS_IMPL;
- }
-
- @Override
- public String getSchema() {
- return SCHEMA;
- }
- }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategyTest.java
similarity index 83%
rename from
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
rename to
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategyTest.java
index fb9d416bbb..d6d38f0e26 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategyTest.java
@@ -15,11 +15,12 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.writer;
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
@@ -29,8 +30,8 @@ import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
@@ -88,7 +89,8 @@ public class ParquetReadStrategyTest {
Assertions.assertNotNull(resource);
String path = Paths.get(resource.toURI()).toString();
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(path);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
@@ -103,7 +105,8 @@ public class ParquetReadStrategyTest {
Assertions.assertNotNull(resource);
String path = Paths.get(resource.toURI()).toString();
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(path);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
@@ -118,7 +121,8 @@ public class ParquetReadStrategyTest {
Assertions.assertNotNull(resource);
String path = Paths.get(resource.toURI()).toString();
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(path);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
@@ -152,7 +156,8 @@ public class ParquetReadStrategyTest {
String confPath = Paths.get(conf.toURI()).toString();
Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
parquetReadStrategy.setPluginConfig(pluginConfig);
SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(path);
@@ -181,7 +186,8 @@ public class ParquetReadStrategyTest {
String confPath = Paths.get(conf.toURI()).toString();
Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
parquetReadStrategy.setPluginConfig(pluginConfig);
SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(path);
@@ -196,7 +202,8 @@ public class ParquetReadStrategyTest {
public void testParquetReadArray() throws Exception {
AutoGenerateParquetData.generateTestData();
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(AutoGenerateParquetData.DATA_FILE_PATH);
@@ -231,7 +238,8 @@ public class ParquetReadStrategyTest {
}
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(NativeParquetWriter.DATA_FILE_PATH);
@@ -247,7 +255,8 @@ public class ParquetReadStrategyTest {
NativeParquetWriterWithAvroIncompatibleColumn.generateTestData();
try {
ParquetReadStrategy parquetReadStrategy = new
ParquetReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType rowType =
parquetReadStrategy.getSeaTunnelRowTypeInfo(
@@ -274,7 +283,8 @@ public class ParquetReadStrategyTest {
NativeParquetWriterWithAvroIncompatibleColumnAndList.generateTestData();
try {
ParquetReadStrategy parquetReadStrategy = new
ParquetReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType rowType =
parquetReadStrategy.getSeaTunnelRowTypeInfo(
@@ -300,6 +310,40 @@ public class ParquetReadStrategyTest {
}
}
+ @DisabledOnOs(OS.WINDOWS)
+ @Test
+ public void testParquetReadColumnsWithNativeParquetFallback() throws
Exception {
+ NativeParquetWriterWithAvroIncompatibleColumn.generateTestData();
+ try {
+ ParquetReadStrategy parquetReadStrategy = new
ParquetReadStrategy();
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ parquetReadStrategy.init(localConf);
+ parquetReadStrategy.setPluginConfig(
+ ConfigFactory.parseString(
+ "read_columns = [job_blue-collar, id]\n"
+ + "parse_partition_from_path = false"));
+
+ SeaTunnelRowType rowType =
+ parquetReadStrategy.getSeaTunnelRowTypeInfo(
+
NativeParquetWriterWithAvroIncompatibleColumn.DATA_FILE_PATH);
+ Assertions.assertEquals("job_blue-collar",
rowType.getFieldName(0));
+ Assertions.assertEquals("id", rowType.getFieldName(1));
+
+ TestCollector testCollector = new TestCollector();
+ parquetReadStrategy.read(
+
NativeParquetWriterWithAvroIncompatibleColumn.DATA_FILE_PATH,
+ "",
+ testCollector);
+ Assertions.assertEquals(1, testCollector.getRows().size());
+ SeaTunnelRow row = testCollector.getRows().get(0);
+ Assertions.assertEquals("engineer", row.getField(0));
+ Assertions.assertEquals(1, row.getField(1));
+ } finally {
+ NativeParquetWriterWithAvroIncompatibleColumn.deleteFile();
+ }
+ }
+
@DisabledOnOs(OS.WINDOWS)
@Test
public void testParquetWithUserConfigRowType() throws Exception {
@@ -313,7 +357,8 @@ public class ParquetReadStrategyTest {
CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(pluginConfig);
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType configRowType = catalogTable.getSeaTunnelRowType();
@@ -385,25 +430,6 @@ public class ParquetReadStrategyTest {
}
}
- public static class LocalConf extends HadoopConf {
- private static final String HDFS_IMPL =
"org.apache.hadoop.fs.LocalFileSystem";
- private static final String SCHEMA = "file";
-
- public LocalConf(String hdfsNameKey) {
- super(hdfsNameKey);
- }
-
- @Override
- public String getFsHdfsImpl() {
- return HDFS_IMPL;
- }
-
- @Override
- public String getSchema() {
- return SCHEMA;
- }
- }
-
public static class AutoGenerateParquetData {
public static final String DATA_FILE_PATH = "/tmp/data.parquet";
@@ -544,7 +570,8 @@ public class ParquetReadStrategyTest {
public void testParquetReadNestedArray() throws Exception {
AutoGenerateParquetDataWithNestedArray.generateTestData();
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(
@@ -597,7 +624,8 @@ public class ParquetReadStrategyTest {
public void testParquetReadNestedBytesArray() throws Exception {
AutoGenerateParquetDataWithNestedBytesArray.generateTestData();
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType seaTunnelRowTypeInfo =
@@ -650,7 +678,8 @@ public class ParquetReadStrategyTest {
public void testParquetReadNestedArrayWithUserConfigRowType() throws
Exception {
AutoGenerateParquetDataWithNestedArray.generateTestData();
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
ArrayType<String[], String> stringArrayType =
ArrayType.STRING_ARRAY_TYPE;
@@ -728,7 +757,8 @@ public class ParquetReadStrategyTest {
public void testParquetReadArrayOfMap() throws Exception {
AutoGenerateParquetDataWithArrayOfMap.generateTestData();
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(
@@ -1090,4 +1120,134 @@ public class ParquetReadStrategyTest {
}
}
}
+
+ @DisabledOnOs(OS.WINDOWS)
+ @Test
+ public void testParquetSchemaMerge() throws Exception {
+ AutoGenerateParquetDataWithSchemaMerge.generateTestData();
+
+ ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ parquetReadStrategy.init(localConf);
+
+ Map<String, Object> props = new HashMap<>();
+ props.put(FileBaseSourceOptions.FILENAME_EXTENSION.key(), "parquet");
+ props.put(FileBaseSourceOptions.SORT_FILES_BY_MOD_TIME.key(), true);
+
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(props);
+
+ parquetReadStrategy.setPluginConfig(readonlyConfig.toConfig());
+
+ List<String> fileNames =
+ parquetReadStrategy.getFileNamesByPath(
+ AutoGenerateParquetDataWithSchemaMerge.TMP_PATH);
+ Assertions.assertNotNull(fileNames);
+ Assertions.assertEquals(2, fileNames.size());
+
+ String firstFileName = fileNames.get(0);
+ Assertions.assertNotNull(firstFileName);
+
+ SeaTunnelRowType firstFileRowType =
+
parquetReadStrategy.getSeaTunnelRowTypeInfoWithUserConfigRowType(
+ firstFileName, null);
+ Assertions.assertNotNull(firstFileRowType);
+ String[] fieldNames = firstFileRowType.getFieldNames();
+ Assertions.assertNotNull(fieldNames);
+ Assertions.assertEquals(5, fieldNames.length);
+ Assertions.assertEquals("id", fieldNames[0]);
+ Assertions.assertEquals("name", fieldNames[1]);
+ Assertions.assertEquals("age", fieldNames[2]);
+ Assertions.assertEquals("salary", fieldNames[3]);
+ Assertions.assertEquals("department", fieldNames[4]);
+
+ AutoGenerateParquetDataWithSchemaMerge.deleteFiles();
+ }
+
+ public static class AutoGenerateParquetDataWithSchemaMerge {
+
+ public static final String TMP_PATH =
"/tmp/seatunnel/parquet/schemaMerge/";
+ public static final String OLD_FILE_PATH = TMP_PATH +
"/old_data.parquet";
+ public static final String NEW_FILE_PATH = TMP_PATH +
"/new_data.parquet";
+
+ public static void generateTestData() throws IOException {
+ deleteFiles();
+
+ generateOldParquetFile();
+ generateNewParquetFile();
+ }
+
+ public static void generateOldParquetFile() throws IOException {
+ String schemaString =
+ "{\"type\":\"record\",\"name\":\"User\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"int\"},"
+ + "{\"name\":\"name\",\"type\":\"string\"},"
+ + "{\"name\":\"age\",\"type\":\"int\"},"
+ + "{\"name\":\"salary\",\"type\":\"float\"}"
+ + "]}";
+ Schema schema = new Schema.Parser().parse(schemaString);
+
+ Configuration conf = new Configuration();
+ Path file = new Path(OLD_FILE_PATH);
+
+ ParquetWriter<GenericRecord> writer =
+ AvroParquetWriter.<GenericRecord>builder(file)
+ .withSchema(schema)
+ .withConf(conf)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .build();
+
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", 1);
+ record.put("name", "Alice");
+ record.put("age", 30);
+ record.put("salary", 50000.0f);
+
+ writer.write(record);
+ writer.close();
+ }
+
+ public static void generateNewParquetFile() throws IOException {
+ String schemaString =
+ "{\"type\":\"record\",\"name\":\"User\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"int\"},"
+ + "{\"name\":\"name\",\"type\":\"string\"},"
+ + "{\"name\":\"age\",\"type\":\"int\"},"
+ + "{\"name\":\"salary\",\"type\":\"float\"},"
+ + "{\"name\":\"department\",\"type\":\"string\"}"
+ + "]}";
+ Schema schema = new Schema.Parser().parse(schemaString);
+
+ Configuration conf = new Configuration();
+ Path file = new Path(NEW_FILE_PATH);
+
+ ParquetWriter<GenericRecord> writer =
+ AvroParquetWriter.<GenericRecord>builder(file)
+ .withSchema(schema)
+ .withConf(conf)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .build();
+
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", 2);
+ record.put("name", "Bob");
+ record.put("age", 35);
+ record.put("salary", 60000.0f);
+ record.put("department", "Engineering");
+
+ writer.write(record);
+ writer.close();
+ }
+
+ public static void deleteFiles() {
+ File oldFile = new File(OLD_FILE_PATH);
+ if (oldFile.exists()) {
+ oldFile.delete();
+ }
+ File newFile = new File(NEW_FILE_PATH);
+ if (newFile.exists()) {
+ newFile.delete();
+ }
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyEncodingTest.java
similarity index 86%
rename from
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java
rename to
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyEncodingTest.java
index d16bd96f9c..2229f61091 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyEncodingTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.writer;
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -25,11 +25,7 @@ import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.AbstractReadStrategy;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.XmlReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -114,7 +110,8 @@ public class ReadStrategyEncodingTest {
String sourceFilePath = Paths.get(sourceFile.toURI()).toString();
String confPath = Paths.get(conf.toURI()).toString();
TestCollector testCollector;
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
readStrategy.setPluginConfig(pluginConfig);
readStrategy.init(localConf);
@@ -168,23 +165,4 @@ public class ReadStrategyEncodingTest {
return null;
}
}
-
- public static class LocalConf extends HadoopConf {
- private static final String HDFS_IMPL =
"org.apache.hadoop.fs.LocalFileSystem";
- private static final String SCHEMA = "file";
-
- public LocalConf(String hdfsNameKey) {
- super(hdfsNameKey);
- }
-
- @Override
- public String getFsHdfsImpl() {
- return HDFS_IMPL;
- }
-
- @Override
- public String getSchema() {
- return SCHEMA;
- }
- }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/UpdateSyncModeTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/UpdateSyncModeTest.java
index 9455eeef6f..fff09538f5 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/UpdateSyncModeTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/UpdateSyncModeTest.java
@@ -20,8 +20,8 @@ package
org.apache.seatunnel.connectors.seatunnel.file.source.reader;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -118,7 +118,7 @@ class UpdateSyncModeTest {
targetDir.toUri().toString(),
"distcp",
"len_mtime"));
- strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+ strategy.init(new
LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT));
List<String> files =
strategy.getFileNamesByPath(sourceDir.toUri().toString());
Assertions.assertTrue(files.isEmpty(), "Target is newer with same
len -> SKIP");
@@ -144,7 +144,7 @@ class UpdateSyncModeTest {
targetDir.toUri().toString(),
"distcp",
"len_mtime"));
- strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+ strategy.init(new
LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT));
List<String> files =
strategy.getFileNamesByPath(sourceDir.toUri().toString());
Assertions.assertEquals(1, files.size());
@@ -171,7 +171,7 @@ class UpdateSyncModeTest {
targetDir.toUri().toString(),
"strict",
"checksum"));
- strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+ strategy.init(new
LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT));
List<String> files =
strategy.getFileNamesByPath(sourceDir.toUri().toString());
Assertions.assertTrue(files.isEmpty(), "Checksum equal -> SKIP");
@@ -195,7 +195,7 @@ class UpdateSyncModeTest {
targetDir.toUri().toString(),
"strict",
"checksum"));
- strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+ strategy.init(new
LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT));
List<String> files =
strategy.getFileNamesByPath(sourceDir.toUri().toString());
Assertions.assertEquals(1, files.size());
@@ -223,23 +223,4 @@ class UpdateSyncModeTest {
configMap.put("compare_mode", compareMode);
return ConfigFactory.parseMap(configMap);
}
-
- static class LocalConf extends HadoopConf {
- private static final String HDFS_IMPL =
"org.apache.hadoop.fs.LocalFileSystem";
- private static final String SCHEMA = "file";
-
- public LocalConf(String hdfsNameKey) {
- super(hdfsNameKey);
- }
-
- @Override
- public String getFsHdfsImpl() {
- return HDFS_IMPL;
- }
-
- @Override
- public String getSchema() {
- return SCHEMA;
- }
- }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategyTest.java
similarity index 89%
rename from
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
rename to
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategyTest.java
index fca8f68fd2..01ed161888 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategyTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.writer;
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -27,8 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.XmlReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -62,7 +61,8 @@ public class XmlReadStrategyTest {
String confPath = Paths.get(conf.toURI()).toString();
Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
XmlReadStrategy xmlReadStrategy = new XmlReadStrategy();
- LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf localConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
xmlReadStrategy.setPluginConfig(pluginConfig);
xmlReadStrategy.init(localConf);
List<String> fileNamesByPath =
xmlReadStrategy.getFileNamesByPath(xmlFilePath);
@@ -135,23 +135,4 @@ public class XmlReadStrategyTest {
return null;
}
}
-
- public static class LocalConf extends HadoopConf {
- private static final String HDFS_IMPL =
"org.apache.hadoop.fs.LocalFileSystem";
- private static final String SCHEMA = "file";
-
- public LocalConf(String hdfsNameKey) {
- super(hdfsNameKey);
- }
-
- @Override
- public String getFsHdfsImpl() {
- return HDFS_IMPL;
- }
-
- @Override
- public String getSchema() {
- return SCHEMA;
- }
- }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/util/LocalFileSystemConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/util/LocalFileSystemConf.java
new file mode 100644
index 0000000000..a573837413
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/util/LocalFileSystemConf.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.util;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+public class LocalFileSystemConf {
+
+ public static class LocalConf extends HadoopConf {
+ private static final String HDFS_IMPL =
"org.apache.hadoop.fs.LocalFileSystem";
+ private static final String SCHEMA = "file";
+
+ public LocalConf(String hdfsNameKey) {
+ super(hdfsNameKey);
+ }
+
+ @Override
+ public String getFsHdfsImpl() {
+ return HDFS_IMPL;
+ }
+
+ @Override
+ public String getSchema() {
+ return SCHEMA;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
index 55e33e194c..c84841a164 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
@@ -30,6 +30,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.CsvWriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.CsvReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -66,8 +67,8 @@ public class CsvWriteStrategyTest {
FileSinkConfig writeSinkConfig =
new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig),
writeRowType);
CsvWriteStrategy writeStrategy = new CsvWriteStrategy(writeSinkConfig);
- ParquetReadStrategyTest.LocalConf hadoopConf =
- new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf hadoopConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
writeStrategy.setCatalogTable(
CatalogTableUtil.getCatalogTable("test", null, null, "test",
writeRowType));
writeStrategy.init(hadoopConf, "test1", "test1", 0);
@@ -129,8 +130,8 @@ public class CsvWriteStrategyTest {
FileSinkConfig writeSinkConfig =
new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig),
writeRowType);
CsvWriteStrategy writeStrategy = new CsvWriteStrategy(writeSinkConfig);
- ParquetReadStrategyTest.LocalConf hadoopConf =
- new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf hadoopConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
writeStrategy.setCatalogTable(
CatalogTableUtil.getCatalogTable("test", null, null, "test",
writeRowType));
writeStrategy.init(hadoopConf, "test1", "test1", 0);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
index db048b01b8..b1317c5ff7 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
@@ -38,7 +38,7 @@ public class FileSinkConfigTest {
@Test
public void testConfigInit() throws Exception {
- URL conf =
OrcReadStrategyTest.class.getResource("/test_write_hdfs.conf");
+ URL conf =
FileSinkConfigTest.class.getResource("/test_write_hdfs.conf");
Assertions.assertNotNull(conf);
String confPath = Paths.get(conf.toURI()).toString();
Config config = ConfigFactory.parseFile(new File(confPath));
@@ -53,7 +53,7 @@ public class FileSinkConfigTest {
@Test
public void testConfigInitDefault() throws Exception {
- URL conf =
OrcReadStrategyTest.class.getResource("/test_write_hdfs_default_format.conf");
+ URL conf =
FileSinkConfigTest.class.getResource("/test_write_hdfs_default_format.conf");
Assertions.assertNotNull(conf);
String confPath = Paths.get(conf.toURI()).toString();
Config config = ConfigFactory.parseFile(new File(confPath));
@@ -68,7 +68,7 @@ public class FileSinkConfigTest {
@Test
public void testSinkColumnsGreaterThanSource() throws Exception {
- URL conf =
OrcReadStrategyTest.class.getResource("/test_write_hive.conf");
+ URL conf =
FileSinkConfigTest.class.getResource("/test_write_hive.conf");
Assertions.assertNotNull(conf);
String confPath = Paths.get(conf.toURI()).toString();
Config config = ConfigFactory.parseFile(new File(confPath));
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
index fa179d5097..d75c04ea03 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
@@ -28,6 +28,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -66,8 +67,8 @@ public class OrcWriteStrategyTest {
new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig),
writeRowType);
OrcWriteStrategy writeStrategy = new OrcWriteStrategy(writeSinkConfig);
- OrcReadStrategyTest.LocalConf hadoopConf =
- new OrcReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf hadoopConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
writeStrategy.setCatalogTable(
CatalogTableUtil.getCatalogTable("test", null, null, "test",
writeRowType));
writeStrategy.init(hadoopConf, "test1", "test1", 0);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetTypeCoercionTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetTypeCoercionTest.java
index 006f530b50..22c645746f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetTypeCoercionTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetTypeCoercionTest.java
@@ -33,6 +33,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -218,8 +219,8 @@ public class ParquetTypeCoercionTest {
new FileSinkConfig(
ReadonlyConfig.fromConfig(ConfigFactory.parseMap(writeConfig)), rowType);
ParquetWriteStrategy strategy = new ParquetWriteStrategy(sinkConfig);
- ParquetReadStrategyTest.LocalConf hadoopConf =
- new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf hadoopConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
strategy.setCatalogTable(
CatalogTableUtil.getCatalogTable("test", null, null, testName,
rowType));
strategy.init(hadoopConf, "test-" + testName, "test-" + testName, 0);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyEvolutionTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyEvolutionTest.java
index 67850bf2ef..0f63f7dabf 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyEvolutionTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyEvolutionTest.java
@@ -34,6 +34,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -91,8 +92,8 @@ public class ParquetWriteStrategyEvolutionTest {
ReadonlyConfig.fromConfig(ConfigFactory.parseMap(writeConfig)),
baseRowType);
ParquetWriteStrategy strategy = new ParquetWriteStrategy(sinkConfig);
- ParquetReadStrategyTest.LocalConf hadoopConf =
- new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf hadoopConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
strategy.setCatalogTable(
CatalogTableUtil.getCatalogTable(
"test", null, null, "evolution_table", baseRowType));
@@ -194,8 +195,8 @@ public class ParquetWriteStrategyEvolutionTest {
ReadonlyConfig.fromConfig(ConfigFactory.parseMap(writeConfig)),
baseRowType);
ParquetWriteStrategy strategy = new ParquetWriteStrategy(sinkConfig);
- ParquetReadStrategyTest.LocalConf hadoopConf =
- new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf hadoopConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
strategy.setCatalogTable(
CatalogTableUtil.getCatalogTable(
"test", null, null, "evolution_table", baseRowType));
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
index 7fa6ced60d..f24def6288 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
@@ -30,6 +30,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -80,8 +81,8 @@ public class ParquetWriteStrategyTest {
FileSinkConfig writeSinkConfig =
new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig),
writeRowType);
ParquetWriteStrategy writeStrategy = new
ParquetWriteStrategy(writeSinkConfig);
- ParquetReadStrategyTest.LocalConf hadoopConf =
- new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ LocalFileSystemConf.LocalConf hadoopConf =
+ new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
writeStrategy.setCatalogTable(
CatalogTableUtil.getCatalogTable("test", null, null, "test",
writeRowType));
writeStrategy.init(hadoopConf, "test1", "test1", 0);