This is an automated email from the ASF dual-hosted git repository.

davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b0b095eb4e [Fix][Connector-file] Fix the new schema cannot be fetched 
when the parquet file is read (#10378)
b0b095eb4e is described below

commit b0b095eb4e8ece15099ea25a9da8ef3cc020e2ac
Author: JeremyXin <[email protected]>
AuthorDate: Thu Jun 11 21:20:59 2026 +0800

    [Fix][Connector-file] Fix the new schema cannot be fetched when the parquet 
file is read (#10378)
    
    Co-authored-by: JeremyXin <[email protected]>
---
 docs/en/connectors/source/CosFile.md               |   9 +
 docs/en/connectors/source/FtpFile.md               |  10 +
 docs/en/connectors/source/HdfsFile.md              |   9 +
 docs/en/connectors/source/LocalFile.md             |   9 +
 docs/en/connectors/source/ObsFile.md               |   9 +
 docs/en/connectors/source/OssFile.md               |   9 +
 docs/en/connectors/source/OssJindoFile.md          |   9 +
 docs/en/connectors/source/S3File.md                |   9 +
 docs/en/connectors/source/SftpFile.md              |   9 +
 docs/zh/connectors/source/CosFile.md               |   9 +
 docs/zh/connectors/source/FtpFile.md               |  10 +
 docs/zh/connectors/source/HdfsFile.md              |   9 +
 docs/zh/connectors/source/LocalFile.md             |  10 +
 docs/zh/connectors/source/ObsFile.md               |   8 +
 docs/zh/connectors/source/OssFile.md               |   9 +
 docs/zh/connectors/source/OssJindoFile.md          |   9 +
 docs/zh/connectors/source/S3File.md                |   9 +
 docs/zh/connectors/source/SftpFile.md              |   9 +
 .../file/config/FileBaseSourceOptions.java         |   9 +
 .../connectors/seatunnel/file/config/FileInfo.java |  53 +++++
 .../file/source/reader/AbstractReadStrategy.java   |  38 +++-
 .../file/source/reader/ParquetReadStrategy.java    |  24 ++-
 .../source/reader/AbstractReadStrategyTest.java    |   6 +-
 .../file/source/reader/CsvReadStrategyTest.java    |  33 +--
 .../reader}/OrcReadStrategyTest.java               |  33 +--
 .../reader}/ParquetReadStrategyTest.java           | 232 +++++++++++++++++----
 .../reader}/ReadStrategyEncodingTest.java          |  30 +--
 .../file/source/reader/UpdateSyncModeTest.java     |  29 +--
 .../reader}/XmlReadStrategyTest.java               |  27 +--
 .../seatunnel/file/util/LocalFileSystemConf.java   |  42 ++++
 .../file/writer/CsvWriteStrategyTest.java          |   9 +-
 .../seatunnel/file/writer/FileSinkConfigTest.java  |   6 +-
 .../file/writer/OrcWriteStrategyTest.java          |   5 +-
 .../file/writer/ParquetTypeCoercionTest.java       |   5 +-
 .../writer/ParquetWriteStrategyEvolutionTest.java  |   9 +-
 .../file/writer/ParquetWriteStrategyTest.java      |   5 +-
 36 files changed, 564 insertions(+), 195 deletions(-)

diff --git a/docs/en/connectors/source/CosFile.md 
b/docs/en/connectors/source/CosFile.md
index 14097b4390..4c71220b22 100644
--- a/docs/en/connectors/source/CosFile.md
+++ b/docs/en/connectors/source/CosFile.md
@@ -85,6 +85,7 @@ To use this connector you need put 
hadoop-cos-{hadoop.version}-{version}.jar and
 | file_filter_modified_end   | string  | no       | -                          
 | 
 | quote_char                 | string  | no       | "                          
 |
 | escape_char                | string  | no       | -                          
 |
+| sort_files_by_modification_time | boolean | no       | false                 
      |
 
 ### path [string]
 
@@ -436,6 +437,14 @@ A single character that encloses CSV fields, allowing 
fields with commas, line b
 
 A single character that allows the quote or other special characters to appear 
inside a CSV field without ending the field.
 
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is 
`false`.
+
+When enabled, files will be sorted by their modification time (newest first). 
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the 
latest file
+- You need to process files in chronological order
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](../common-options/source-common-options.md) for details.
diff --git a/docs/en/connectors/source/FtpFile.md 
b/docs/en/connectors/source/FtpFile.md
index d38c18b666..a04def1680 100644
--- a/docs/en/connectors/source/FtpFile.md
+++ b/docs/en/connectors/source/FtpFile.md
@@ -89,6 +89,8 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 | file_filter_modified_end    | string  | no       | -                         
  | 
 | quote_char                  | string  | no       | "                         
  |
 | escape_char                 | string  | no       | -                         
  |
+| sort_files_by_modification_time | boolean | no       | false                 
      |
+| metalake_type               | string  | no       | gravitino                 
  |
 
 ### host [string]
 
@@ -537,6 +539,14 @@ A single character that encloses CSV fields, allowing 
fields with commas, line b
 
 A single character that allows the quote or other special characters to appear 
inside a CSV field without ending the field.
 
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is 
`false`.
+
+When enabled, files will be sorted by their modification time (newest first). 
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the 
latest file
+- You need to process files in chronological order
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](../common-options/source-common-options.md) for details.
diff --git a/docs/en/connectors/source/HdfsFile.md 
b/docs/en/connectors/source/HdfsFile.md
index 9a4835af34..8124c1a6c6 100644
--- a/docs/en/connectors/source/HdfsFile.md
+++ b/docs/en/connectors/source/HdfsFile.md
@@ -95,6 +95,7 @@ Read data from hdfs file system.
 | file_split_size            | long    | no       | 134217728                  
 | Split size in bytes when `enable_file_split=true`. For `text`/`csv`/`json`, 
the split end will be aligned to the next `row_delimiter`. For `parquet`, the 
split unit is RowGroup and will never break a RowGroup.                         
                                                                                
                  |
 | quote_char                 | string  | no       | "                          
 | A single character that encloses CSV fields, allowing fields with commas, 
line breaks, or quotes to be read correctly.                                    
                                                                                
                                                                                
                    |
 | escape_char                | string  | no       | -                          
 | A single character that allows the quote or other special characters to 
appear inside a CSV field without ending the field.                             
                                                                                
                                                                                
                      |
+| sort_files_by_modification_time | boolean | no   | false                     
  | Sort files by modification time in descending order. Enable this when 
reading evolving schemas to ensure schema inference uses the latest file.       
                                                                                
                        |
 
 ### file_format_type [string]
 
@@ -320,6 +321,14 @@ A single character that encloses CSV fields, allowing 
fields with commas, line b
 
 A single character that allows the quote or other special characters to appear 
inside a CSV field without ending the field.
 
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is 
`false`.
+
+When enabled, files will be sorted by their modification time (newest first). 
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the 
latest file
+- You need to process files in chronological order
+
 ### Tips
 
 > If you use spark/flink, In order to use this connector, You must ensure your 
 > spark/flink cluster already integrated hadoop. The tested hadoop version is 
 > 2.x. If you use SeaTunnel Engine, It automatically integrated the hadoop jar 
 > when you download and install SeaTunnel Engine. You can check the jar 
 > package under ${SEATUNNEL_HOME}/lib to confirm this.
diff --git a/docs/en/connectors/source/LocalFile.md 
b/docs/en/connectors/source/LocalFile.md
index 33e4085509..44b208b8ac 100644
--- a/docs/en/connectors/source/LocalFile.md
+++ b/docs/en/connectors/source/LocalFile.md
@@ -92,6 +92,7 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 | file_split_size            | long    | no       | 134217728                  
          |
 | quote_char                 | string  | no       | "                          
          |
 | escape_char                | string  | no       | -                          
          |
+| sort_files_by_modification_time | boolean | no       | false                 
      | Sort files by modification time in descending order. Enable this when 
reading evolving schemas to ensure schema inference uses the latest file.       
                                                                                
                        |
 ### path [string]
 
 The source file path.
@@ -534,6 +535,14 @@ A single character that encloses CSV fields, allowing 
fields with commas, line b
 
 A single character that allows the quote or other special characters to appear 
inside a CSV field without ending the field.
 
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is 
`false`.
+
+When enabled, files will be sorted by their modification time (newest first). 
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the 
latest file
+- You need to process files in chronological order
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](../common-options/source-common-options.md) for details
diff --git a/docs/en/connectors/source/ObsFile.md 
b/docs/en/connectors/source/ObsFile.md
index 824c63f07c..4ecc0d5a21 100644
--- a/docs/en/connectors/source/ObsFile.md
+++ b/docs/en/connectors/source/ObsFile.md
@@ -86,6 +86,7 @@ It only supports hadoop version **2.9.X+**.
 | file_filter_modified_end   | string  | no       | -                   | File 
modification time filter. The connector will filter some files base on the last 
modification end time (not include end time). The default data format is 
`yyyy-MM-dd HH:mm:ss`. |
 | quote_char                 | string  | no       | "                   | A 
single character that encloses CSV fields, allowing fields with commas, line 
breaks, or quotes to be read correctly.                                         
                      |
 | escape_char                | string  | no       | -                   | A 
single character that allows the quote or other special characters to appear 
inside a CSV field without ending the field.                                    
                      |
+| sort_files_by_modification_time | boolean | no       | false               | 
Sort files by modification time in descending order. Enable this when reading 
evolving schemas to ensure schema inference uses the latest file.               
                                                                                
                |
 
 ### Tips
 
@@ -273,6 +274,14 @@ schema {
 
 > Source plugin common parameters, please refer to [Source Common 
 > Options](../common-options/source-common-options.md) for details.
 
+#### <span id="sort_files_by_modification_time"> 
sort_files_by_modification_time </span>
+
+> Whether to sort files by modification time in descending order. Default is 
`false`.
+>
+> When enabled, files will be sorted by their modification time (newest 
first). This is useful when:
+> - Reading files with evolving schemas and you want schema inference to use 
the latest file
+> - You need to process files in chronological order
+
 ## Task Example
 
 ### text file
diff --git a/docs/en/connectors/source/OssFile.md 
b/docs/en/connectors/source/OssFile.md
index 2ed32e9ca9..8636bce177 100644
--- a/docs/en/connectors/source/OssFile.md
+++ b/docs/en/connectors/source/OssFile.md
@@ -217,6 +217,7 @@ If you assign file type to `parquet` `orc`, schema option 
not required, connecto
 | file_filter_modified_end   | string  | no       | -                   | File 
modification time filter. The connector will filter some files base on the last 
modification end time (not include end time). The default data format is 
`yyyy-MM-dd HH:mm:ss`.                                                          
                                                                                
      |
 | quote_char                 | string  | no       | "                   | A 
single character that encloses CSV fields, allowing fields with commas, line 
breaks, or quotes to be read correctly.                                         
                                                                                
                                                                                
     |
 | escape_char                | string  | no       | -                   | A 
single character that allows the quote or other special characters to appear 
inside a CSV field without ending the field.                                    
                                                                                
                                                                                
     |
+| sort_files_by_modification_time | boolean | no       | false               | 
Sort files by modification time in descending order. Enable this when reading 
evolving schemas to ensure schema inference uses the latest file.               
                                                                                
                |
 
 ### file_format_type [string]
 
@@ -282,6 +283,14 @@ A single character that encloses CSV fields, allowing 
fields with commas, line b
 
 A single character that allows the quote or other special characters to appear 
inside a CSV field without ending the field.
 
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is 
`false`.
+
+When enabled, files will be sorted by their modification time (newest first). 
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the 
latest file
+- You need to process files in chronological order
+
 ### file_filter_pattern [string]
 
 Filter pattern, which used for filtering files.  If you only want to filter 
based on file names, simply write the regular file names; If you want to filter 
based on the file directory at the same time, the expression needs to start 
with `path`.
diff --git a/docs/en/connectors/source/OssJindoFile.md 
b/docs/en/connectors/source/OssJindoFile.md
index e3b9867f23..6cadda858c 100644
--- a/docs/en/connectors/source/OssJindoFile.md
+++ b/docs/en/connectors/source/OssJindoFile.md
@@ -87,6 +87,7 @@ It only supports hadoop version **2.9.X+**.
 | file_filter_modified_end   | string  | no       | -                          
 | 
 | quote_char                 | string  | no       | "                          
 | 
 | escape_char                | string  | no       | -                          
 |
+| sort_files_by_modification_time | boolean | no       | false                 
      |
 
 ### path [string]
 
@@ -417,6 +418,14 @@ A single character that encloses CSV fields, allowing 
fields with commas, line b
 
 A single character that allows the quote or other special characters to appear 
inside a CSV field without ending the field.
 
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is 
`false`.
+
+When enabled, files will be sorted by their modification time (newest first). 
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the 
latest file
+- You need to process files in chronological order
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](../common-options/source-common-options.md) for details.
diff --git a/docs/en/connectors/source/S3File.md 
b/docs/en/connectors/source/S3File.md
index 61275f37c1..b4ccfda5e9 100644
--- a/docs/en/connectors/source/S3File.md
+++ b/docs/en/connectors/source/S3File.md
@@ -226,6 +226,7 @@ If you assign file type to `parquet` `orc`, schema option 
not required, connecto
 | common-options                  |         | no       | -                     
                                | Source plugin common parameters, please refer 
to [Source Common Options](../common-options/source-common-options.md) for 
details.                                                                        
                                                                                
                                                                                
                   [...]
 | quote_char                      | string  | no       | "                     
                                | A single character that encloses CSV fields, 
allowing fields with commas, line breaks, or quotes to be read correctly.       
                                                                                
                                                                                
                                                                                
               [...]
 | escape_char                     | string  | no       | -                     
                                | A single character that allows the quote or 
other special characters to appear inside a CSV field without ending the field. 
                                                                                
                                                                                
                                                                                
                [...]
+| sort_files_by_modification_time | boolean | no       | false                 
                                | Sort files by modification time in descending 
order. Enable this when reading evolving schemas to ensure schema inference 
uses the latest file.                                                           
                                                                                
                                    |
 
 ### file_format_type [string]
 
@@ -394,6 +395,14 @@ A single character that encloses CSV fields, allowing 
fields with commas, line b
 
 A single character that allows the quote or other special characters to appear 
inside a CSV field without ending the field.
 
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is 
`false`.
+
+When enabled, files will be sorted by their modification time (newest first). 
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the 
latest file
+- You need to process files in chronological order
+
 ### schema [config]
 
 #### fields [Config]
diff --git a/docs/en/connectors/source/SftpFile.md 
b/docs/en/connectors/source/SftpFile.md
index b819cb3ce5..f32adda510 100644
--- a/docs/en/connectors/source/SftpFile.md
+++ b/docs/en/connectors/source/SftpFile.md
@@ -120,6 +120,7 @@ The File does not have a specific type list, and we can 
indicate which SeaTunnel
 | file_filter_modified_end   | string  | no       | -                          
   | File modification time filter. The connector will filter some files base 
on the last modification end time (not include end time). The default data 
format is `yyyy-MM-dd HH:mm:ss`.                                                
                                                                                
                                                            |
 | quote_char                 | string  | no       | "                          
   | A single character that encloses CSV fields, allowing fields with commas, 
line breaks, or quotes to be read correctly.                                    
                                                                                
                                                                                
                                                      |
 | escape_char                | string  | no       | -                          
   | A single character that allows the quote or other special characters to 
appear inside a CSV field without ending the field.                             
                                                                                
                                                                                
                                                        |
+| sort_files_by_modification_time | boolean | no       | false                 
      | Sort files by modification time in descending order. Enable this when 
reading evolving schemas to ensure schema inference uses the latest file.       
                                                                                
                        |
 
 ### file_filter_pattern [string]
 
@@ -377,6 +378,14 @@ A single character that encloses CSV fields, allowing 
fields with commas, line b
 
 A single character that allows the quote or other special characters to appear 
inside a CSV field without ending the field.
 
+### sort_files_by_modification_time [boolean]
+
+Whether to sort files by modification time in descending order. Default is 
`false`.
+
+When enabled, files will be sorted by their modification time (newest first). 
This is useful when:
+- Reading files with evolving schemas and you want schema inference to use the 
latest file
+- You need to process files in chronological order
+
 ### schema [config]
 
 #### fields [Config]
diff --git a/docs/zh/connectors/source/CosFile.md 
b/docs/zh/connectors/source/CosFile.md
index 5a2e8943bc..f9622a1ab3 100644
--- a/docs/zh/connectors/source/CosFile.md
+++ b/docs/zh/connectors/source/CosFile.md
@@ -84,6 +84,7 @@ import ChangeLog from '../changelog/connector-file-cos.md';
 | file_filter_modified_end   | string  | 否  | -                   |
 | quote_char                 | string  | 否  | "                   | 
 | escape_char                | string  | 否  | -                   |
+| sort_files_by_modification_time | boolean | 否 | false               | 
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。                     
                                                                                
                 |
 
 ### path [string]
 
@@ -430,6 +431,14 @@ abc.*
 
 用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。
 
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
 ### common options
 
 源插件常用参数,详见[源端通用选项](../common-options/source-common-options.md)。
diff --git a/docs/zh/connectors/source/FtpFile.md 
b/docs/zh/connectors/source/FtpFile.md
index 64666a8c85..00cffcdfd4 100644
--- a/docs/zh/connectors/source/FtpFile.md
+++ b/docs/zh/connectors/source/FtpFile.md
@@ -85,6 +85,8 @@ import ChangeLog from '../changelog/connector-file-ftp.md';
 | file_filter_modified_end    | string  | 否    | -                   | 
 | quote_char                  | string  | 否    | "                   | 
 | escape_char                 | string  | 否    | -                   |
+| sort_files_by_modification_time | boolean | 否 | false               | 
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。                     
                                                                                
                 |
+
 ### host [string]
 
 目标 FTP 主机地址,必填项。
@@ -506,6 +508,14 @@ compare_mode = "len_mtime"
 
 用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。
 
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
 ### 通用选项
 
 源插件的通用参数,详情请参考 [源通用选项](../common-options/source-common-options.md)。
diff --git a/docs/zh/connectors/source/HdfsFile.md 
b/docs/zh/connectors/source/HdfsFile.md
index c2c649ffc9..e002cafec6 100644
--- a/docs/zh/connectors/source/HdfsFile.md
+++ b/docs/zh/connectors/source/HdfsFile.md
@@ -95,6 +95,7 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
 | file_split_size            | long    | 否    | 134217728           | 
`enable_file_split=true` 时生效,单位字节。`text`/`csv`/`json` 按 `file_split_size` 
拆分并对齐到下一个 `row_delimiter`;`parquet` 以 RowGroup 为拆分单位,不会切开 RowGroup。             
                                   |
 | quote_char                 | string  | 否    | "                   | 用于包裹 CSV 
字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。                                                  
                                                                                
        |
 | escape_char                | string  | 否    | -                   | 用于在 CSV 
字段内转义引号或其他特殊字符,使其不会结束字段。                                                        
                                                                                
         |
+| sort_files_by_modification_time | boolean | 否 | false               | 
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。                     
                                                                                
                 |
 
 ### file_format_type [string]
 
@@ -321,6 +322,14 @@ abc.*
 
 用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。
 
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
 ### schema [config]
 
 仅在文件格式类型为 text、json、excel、xml 或 csv(或其他无法从元数据中读取 schema 的格式)时需要配置。
diff --git a/docs/zh/connectors/source/LocalFile.md 
b/docs/zh/connectors/source/LocalFile.md
index 14f1b074be..323171bbde 100644
--- a/docs/zh/connectors/source/LocalFile.md
+++ b/docs/zh/connectors/source/LocalFile.md
@@ -92,6 +92,8 @@ import ChangeLog from '../changelog/connector-file-local.md';
 | file_split_size            | long    | 否    | 134217728           | 
 | quote_char                 | string  | 否    | -                   | 
 | escape_char                | string  | 否    | -                   |
+| sort_files_by_modification_time | boolean | 否 | false               | 
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。                     
                                                                                
                 |
+
 ### path [string]
 
 源文件路径。
@@ -534,6 +536,14 @@ compare_mode = "len_mtime"
 
 用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。
 
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
 ### 通用选项
 
 数据源插件通用参数,请参阅 [数据源通用选项](../common-options/source-common-options.md) 了解详情
diff --git a/docs/zh/connectors/source/ObsFile.md 
b/docs/zh/connectors/source/ObsFile.md
index 6349284912..4a44ea4868 100644
--- a/docs/zh/connectors/source/ObsFile.md
+++ b/docs/zh/connectors/source/ObsFile.md
@@ -80,6 +80,7 @@ import ChangeLog from '../changelog/connector-file-obs.md';
 | time_format               | string  | 否  | HH:mm:ss            | 时间类型格式      
                            |
 | quote_char                | string  | 否  | "                   | 用于包裹 CSV 
字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。 |
 | escape_char               | string  | 否  | -                   | 用于在 CSV 
字段内转义引号或其他特殊字符,使其不会结束字段。        |
+| sort_files_by_modification_time | boolean | 否 | false               | 
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。                     
                                                                                
                 |
 
 ### file_format_type [string]
 
@@ -110,6 +111,13 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
 
 注意:Markdown 格式仅支持读取,不支持写入。
 
+### sort_files_by_modification_time [boolean]
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
 ## 变更日志
 
 <ChangeLog />
diff --git a/docs/zh/connectors/source/OssFile.md 
b/docs/zh/connectors/source/OssFile.md
index aa583cb0e5..67cc8a2904 100644
--- a/docs/zh/connectors/source/OssFile.md
+++ b/docs/zh/connectors/source/OssFile.md
@@ -216,6 +216,7 @@ schema {
 | file_filter_modified_end   | string  | 否    | -                  | 
按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`                      
                                                                     |
 | quote_char                 | string  | 否    | "                   | 用于包裹 CSV 
字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。                                                  
                                                            |
 | escape_char                | string  | 否    | -                  | 用于在 CSV 
字段内转义引号或其他特殊字符,使其不会结束字段。                                                        
                                                             |
+| sort_files_by_modification_time | boolean | 否 | false               | 
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。                     
                                                                                
                 |
 
 ### compress_codec [string]
 
@@ -281,6 +282,14 @@ markdown 解析器提取各种元素,包括标题、段落、列表、代码
 
 用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。
 
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
 ### file_filter_pattern [string]
 
 文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。
diff --git a/docs/zh/connectors/source/OssJindoFile.md 
b/docs/zh/connectors/source/OssJindoFile.md
index 096488d268..7e68d61b21 100644
--- a/docs/zh/connectors/source/OssJindoFile.md
+++ b/docs/zh/connectors/source/OssJindoFile.md
@@ -80,6 +80,15 @@ import ChangeLog from 
'../changelog/connector-file-oss-jindo.md';
 | file_filter_pattern       | string  | 否  | -                           | 
文件过滤模式                                                                        |
 | quote_char                | string  | 否  | "                           | 
用于包裹 CSV 字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。                                       |
 | escape_char               | string  | 否  | -                           | 用于在 
CSV 字段内转义引号或其他特殊字符,使其不会结束字段。                                              |
+| sort_files_by_modification_time | boolean | 否 | false               | 
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。                     
                                                                                
                 |
+
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
 
 ### file_format_type [string]
 
diff --git a/docs/zh/connectors/source/S3File.md 
b/docs/zh/connectors/source/S3File.md
index 6fae586557..6af96d6202 100644
--- a/docs/zh/connectors/source/S3File.md
+++ b/docs/zh/connectors/source/S3File.md
@@ -225,6 +225,7 @@ schema {
 | common-options                  |         | 否    | -                         
                            | 
数据源插件通用参数,请参考[数据源通用选项](../common-options/source-common-options.md)了解详情。         
                                                                                
                                                                                
                                                                                
     |
 | quote_char                      | string  | 否    | "                         
                            | 用于包裹 CSV 字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。           
                                                                                
                                                                                
                                                                                
                    |
 | escape_char                     | string  | 否    | -                         
                            | 用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。                  
                                                                                
                                                                                
                                                                                
                    |
+| sort_files_by_modification_time | boolean | 否 | false               | 
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。                     
                                                                                
                 |
 
 ### delimiter/field_delimiter [string]
 
@@ -246,6 +247,14 @@ schema {
 
 用于在 CSV 字段内转义引号或其他特殊字符,使其不会结束字段。
 
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
 ### file_filter_pattern [string]
 
 文件过滤模式,用于过滤文件。若只想根据文件名称筛选,则直接写文件名称的正则;若同时想根据文件目录进行过滤,则表达式以`path`起始。
diff --git a/docs/zh/connectors/source/SftpFile.md 
b/docs/zh/connectors/source/SftpFile.md
index 9112f06d34..c225af5a58 100644
--- a/docs/zh/connectors/source/SftpFile.md
+++ b/docs/zh/connectors/source/SftpFile.md
@@ -120,6 +120,7 @@ import ChangeLog from '../changelog/connector-file-sftp.md';
 | file_filter_modified_end   | string  | 否    | -                   | 
按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`                      
                                                                                
                                                                                
   |
 | quote_char                 | string  | 否    | "                   | 用于包裹 CSV 
字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。                                                  
                                                                                
                                                                          |
 | escape_char                | string  | 否    | -                   | 用于在 CSV 
字段内转义引号或其他特殊字符,使其不会结束字段。                                                        
                                                                                
                                                                           |
+| sort_files_by_modification_time | boolean | 否 | false               | 
是否按修改时间降序排序文件。启用此选项后,在读取不断演化的 schema 时可确保 schema 推断使用最新的文件。                     
                                                                                
                 |
 
 ### file_filter_pattern [string]
 
@@ -384,6 +385,14 @@ compare_mode = "len_mtime"
 
 更多信息请参考 [元数据 SPI](../../introduction/concepts/metadata-spi.md)。
 
+### sort_files_by_modification_time [boolean]
+
+是否按修改时间降序排序文件。默认值为 `false`。
+
+启用后,文件将按修改时间排序(最新的在前)。适用于以下场景:
+- 读取具有不断演化的 schema 的文件,且希望 schema 推断使用最新的文件
+- 需要按时间顺序处理文件
+
 ## 如何创建Sftp数据同步作业
 
 以下示例演示如何创建从sftp读取数据并在本地客户端打印的数据同步作业:
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
index 62590e5aa5..99ec07fb51 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseSourceOptions.java
@@ -224,4 +224,13 @@ public class FileBaseSourceOptions extends FileBaseOptions 
{
                     .noDefaultValue()
                     .withDescription(
                             "A single character that allows the quote or other 
special characters to appear inside a CSV field without ending the field.");
+
+    public static final Option<Boolean> SORT_FILES_BY_MOD_TIME =
+            Options.key("sort_files_by_modification_time")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Sort files by modification time in descending 
order. "
+                                    + "Enable this when reading evolving 
schemas to ensure schema inference uses the latest file. "
+                                    + "Disabled by default to avoid 
performance overhead for large file directories.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileInfo.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileInfo.java
new file mode 100644
index 0000000000..c7c4be78ae
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.config;
+
+import lombok.Getter;
+
+import java.util.Objects;
+
+/**
+ * Represents a file with its metadata information for schema inference.
+ *
+ * <p>This class is used to track file information during file listing 
operations, particularly to
+ * support sorting by modification time.
+ */
+@Getter
+public class FileInfo {
+
+    private final String fileName;
+    private final long modifyTime;
+
+    public FileInfo(String fileName, long modifyTime) {
+        this.fileName = fileName;
+        this.modifyTime = modifyTime;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        FileInfo fileInfo = (FileInfo) o;
+        return modifyTime == fileInfo.modifyTime && Objects.equals(fileName, 
fileInfo.fileName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(fileName, modifyTime);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 79bc6badf3..a685a69fec 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -35,6 +35,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressForm
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileCompareMode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileInfo;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSyncMode;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileUpdateStrategy;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
@@ -66,6 +67,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -73,6 +75,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 
@@ -123,6 +126,8 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
     protected transient HadoopFileSystemProxy targetHadoopFileSystemProxy;
     protected transient boolean shareTargetFileSystemProxy;
     protected transient boolean checksumUnavailableWarned;
+    protected boolean sortFilesByModTime =
+            FileBaseSourceOptions.SORT_FILES_BY_MOD_TIME.defaultValue();
 
     private static final class UpdateModeStats {
         private long scanned;
@@ -151,9 +156,15 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
 
     @Override
     public List<String> getFileNamesByPath(String path) throws IOException {
-        ArrayList<String> fileNames = new ArrayList<>();
+        List<FileInfo> fileInfoList = new ArrayList<>();
         UpdateModeStats updateModeStats = enableUpdateSync ? new 
UpdateModeStats() : null;
-        collectFileNamesByPath(path, fileNames, updateModeStats);
+        collectFileInfoByPath(path, fileInfoList, updateModeStats);
+
+        // Sort by modification time in descending order if enabled
+        if (sortFilesByModTime) {
+            
fileInfoList.sort(Comparator.comparingLong(FileInfo::getModifyTime).reversed());
+        }
+
         if (updateModeStats != null) {
             log.info(
                     "Update sync mode statistics: scanned={}, skipped={}, 
to_sync={}",
@@ -161,19 +172,24 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
                     updateModeStats.skipped,
                     updateModeStats.scanned - updateModeStats.skipped);
         }
-        return fileNames;
+
+        for (FileInfo fileInfo : fileInfoList) {
+            this.fileNames.add(fileInfo.getFileName());
+        }
+
+        return 
fileInfoList.stream().map(FileInfo::getFileName).collect(Collectors.toList());
     }
 
-    private void collectFileNamesByPath(
-            String path, List<String> fileNames, UpdateModeStats 
updateModeStats)
+    private void collectFileInfoByPath(
+            String path, List<FileInfo> fileInfoList, UpdateModeStats 
updateModeStats)
             throws IOException {
         FileStatus[] stats = hadoopFileSystemProxy.listStatus(path);
         for (FileStatus fileStatus : stats) {
             if (fileStatus.isDirectory()) {
                 // skip hidden tmp directory, such as .hive-staging_hive
                 if (!fileStatus.getPath().getName().startsWith(".")) {
-                    collectFileNamesByPath(
-                            fileStatus.getPath().toString(), fileNames, 
updateModeStats);
+                    collectFileInfoByPath(
+                            fileStatus.getPath().toString(), fileInfoList, 
updateModeStats);
                 }
                 continue;
             }
@@ -214,8 +230,8 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
                 updateModeStats.scanned++;
             }
             if (shouldSyncFileInUpdateMode(fileStatus)) {
-                fileNames.add(filePath);
-                this.fileNames.add(filePath);
+                FileInfo fileInfo = new FileInfo(filePath, 
fileStatus.getModificationTime());
+                fileInfoList.add(fileInfo);
             } else if (updateModeStats != null) {
                 updateModeStats.skipped++;
             }
@@ -319,6 +335,10 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
             enableSplitFile =
                     
pluginConfig.getBoolean(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key());
         }
+        if 
(pluginConfig.hasPath(FileBaseSourceOptions.SORT_FILES_BY_MOD_TIME.key())) {
+            sortFilesByModTime =
+                    
pluginConfig.getBoolean(FileBaseSourceOptions.SORT_FILES_BY_MOD_TIME.key());
+        }
 
         if (pluginConfig.hasPath(FileBaseSourceOptions.FILE_PATH.key())
                 && 
pluginConfig.getValue(FileBaseSourceOptions.FILE_PATH.key()).valueType()
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index d444dc84f7..de2bc75d69 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -91,8 +91,6 @@ public class ParquetReadStrategy extends AbstractReadStrategy 
{
     private static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
     private static final String PARQUET = "Parquet";
 
-    private int[] indexes;
-
     @Override
     public void read(String path, String tableId, Collector<SeaTunnelRow> 
output)
             throws FileConnectorException, IOException {
@@ -163,7 +161,8 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                     fields = new Object[fieldsCount];
                 }
                 for (int i = 0; i < fieldsCount; i++) {
-                    Object data = record.get(indexes[i]);
+                    String fieldName = seaTunnelRowType.getFieldName(i);
+                    Object data = record.hasField(fieldName) ? 
record.get(fieldName) : null;
                     fields[i] = resolveObject(data, 
seaTunnelRowType.getFieldType(i));
                 }
                 SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
@@ -198,6 +197,7 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
         try (ParquetReader<Group> closeableReader = reader) {
             Group record;
             while ((record = closeableReader.read()) != null) {
+                GroupType recordType = record.getType();
                 Object[] fields;
                 if (isMergePartition) {
                     int index = fieldsCount;
@@ -209,11 +209,17 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                     fields = new Object[fieldsCount];
                 }
                 for (int i = 0; i < fieldsCount; i++) {
+                    String fieldName = seaTunnelRowType.getFieldName(i);
+                    if (!recordType.containsField(fieldName)) {
+                        fields[i] = null;
+                        continue;
+                    }
+                    int fieldIndex = recordType.getFieldIndex(fieldName);
                     fields[i] =
                             resolveGroupObject(
                                     record,
-                                    record.getType().getType(indexes[i]),
-                                    indexes[i],
+                                    recordType.getType(fieldIndex),
+                                    fieldIndex,
                                     seaTunnelRowType.getFieldType(i));
                 }
                 SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
@@ -765,15 +771,12 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
         }
         String[] fields = new String[readColumns.size()];
         SeaTunnelDataType<?>[] types = new 
SeaTunnelDataType[readColumns.size()];
-        indexes = new int[readColumns.size()];
         buildColumnsWithErrorCheck(
                 TablePath.DEFAULT,
                 IntStream.range(0, readColumns.size()).iterator(),
                 i -> {
                     fields[i] = readColumns.get(i);
                     Type type = originalSchema.getType(fields[i]);
-                    int fieldIndex = originalSchema.getFieldIndex(fields[i]);
-                    indexes[i] = fieldIndex;
                     SeaTunnelDataType<?> configDataType =
                             getConfigFieldType(configRowType, fields[i]);
                     types[i] = parquetType2SeaTunnelType(type, configDataType, 
fields[i]);
@@ -781,6 +784,11 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
 
         seaTunnelRowType = new SeaTunnelRowType(fields, types);
         seaTunnelRowTypeWithPartition = mergePartitionTypes(path, 
seaTunnelRowType);
+
+        log.debug(
+                "get seatunnel row type with user config: {}. path: {}",
+                getActualSeaTunnelRowTypeInfo(),
+                path);
         return getActualSeaTunnelRowTypeInfo();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
index 74b4f7abe9..626a33bbd7 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.file.writer.ParquetReadStrategyTest;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
@@ -97,8 +97,8 @@ public class AbstractReadStrategyTest {
     public void testReadDirectorySkipHiddenDirectories() throws Exception {
         AutoGenerateParquetData.generateTestData();
         try (ParquetReadStrategy parquetReadStrategy = new 
ParquetReadStrategy(); ) {
-            ParquetReadStrategyTest.LocalConf localConf =
-                    new 
ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+            LocalFileSystemConf.LocalConf localConf =
+                    new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
             parquetReadStrategy.init(localConf);
             List<String> list =
                     
parquetReadStrategy.getFileNamesByPath(AutoGenerateParquetData.DATA_FILE_PATH);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategyTest.java
index 681963468f..0278315adc 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategyTest.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -51,7 +51,8 @@ public class CsvReadStrategyTest {
         URL resource = CsvReadStrategyTest.class.getResource("/test.csv");
         String path = Paths.get(resource.toURI()).toString();
         CsvReadStrategy csvReadStrategy = new CsvReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         csvReadStrategy.init(localConf);
         csvReadStrategy.getFileNamesByPath(path);
         csvReadStrategy.setPluginConfig(ConfigFactory.empty());
@@ -80,7 +81,8 @@ public class CsvReadStrategyTest {
         URL resource = CsvReadStrategyTest.class.getResource("/test-csv.csv");
         String path = Paths.get(resource.toURI()).toString();
         CsvReadStrategy csvReadStrategy = new CsvReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         csvReadStrategy.init(localConf);
         csvReadStrategy.getFileNamesByPath(path);
         System.setProperty("field_delimiter", ";");
@@ -112,7 +114,8 @@ public class CsvReadStrategyTest {
                 
CsvReadStrategyTest.class.getResource("/csv/special_quote_char_break_line.csv");
         String path = Paths.get(resource.toURI()).toString();
         CsvReadStrategy csvReadStrategy = new CsvReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         csvReadStrategy.init(localConf);
         csvReadStrategy.getFileNamesByPath(path);
         
csvReadStrategy.setPluginConfig(ConfigFactory.parseMap(getOptionsForSpecialQuoteChar()));
@@ -170,7 +173,8 @@ public class CsvReadStrategyTest {
         String path = Paths.get(resource.toURI()).toString();
         TestCollector testCollector;
         try (CsvReadStrategy csvReadStrategy = new CsvReadStrategy()) {
-            LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+            LocalFileSystemConf.LocalConf localConf =
+                    new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
             csvReadStrategy.init(localConf);
             csvReadStrategy.getFileNamesByPath(path);
             
csvReadStrategy.setPluginConfig(ConfigFactory.parseMap(csvBomOptions));
@@ -226,23 +230,4 @@ public class CsvReadStrategyTest {
             return null;
         }
     }
-
-    public static class LocalConf extends HadoopConf {
-        private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.LocalFileSystem";
-        private static final String SCHEMA = "file";
-
-        public LocalConf(String hdfsNameKey) {
-            super(hdfsNameKey);
-        }
-
-        @Override
-        public String getFsHdfsImpl() {
-            return HDFS_IMPL;
-        }
-
-        @Override
-        public String getSchema() {
-            return SCHEMA;
-        }
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategyTest.java
similarity index 89%
rename from 
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
rename to 
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategyTest.java
index b44f2c4c5d..7a4951405c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategyTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.writer;
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -24,8 +24,7 @@ import 
org.apache.seatunnel.shade.org.apache.commons.lang3.tuple.Pair;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.orc.OrcFile;
@@ -58,7 +57,8 @@ public class OrcReadStrategyTest {
         Assertions.assertNotNull(orcFile);
         String orcFilePath = Paths.get(orcFile.toURI()).toString();
         OrcReadStrategy orcReadStrategy = new OrcReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         orcReadStrategy.init(localConf);
         TestCollector testCollector = new TestCollector();
         SeaTunnelRowType seaTunnelRowTypeInfo =
@@ -76,7 +76,8 @@ public class OrcReadStrategyTest {
     @Test
     public void testReadNotExistedFile() throws Exception {
         OrcReadStrategy orcReadStrategy = new OrcReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         orcReadStrategy.init(localConf);
         Exception exception =
                 Assertions.assertThrows(
@@ -94,7 +95,8 @@ public class OrcReadStrategyTest {
         String orcFilePath = Paths.get(orcFile.toURI()).toString();
         String confPath = Paths.get(conf.toURI()).toString();
         OrcReadStrategy orcReadStrategy = new OrcReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
         orcReadStrategy.init(localConf);
         orcReadStrategy.setPluginConfig(pluginConfig);
@@ -189,23 +191,4 @@ public class OrcReadStrategyTest {
             return null;
         }
     }
-
-    public static class LocalConf extends HadoopConf {
-        private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.LocalFileSystem";
-        private static final String SCHEMA = "file";
-
-        public LocalConf(String hdfsNameKey) {
-            super(hdfsNameKey);
-        }
-
-        @Override
-        public String getFsHdfsImpl() {
-            return HDFS_IMPL;
-        }
-
-        @Override
-        public String getSchema() {
-            return SCHEMA;
-        }
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategyTest.java
similarity index 83%
rename from 
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
rename to 
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategyTest.java
index fb9d416bbb..d6d38f0e26 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategyTest.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.writer;
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
@@ -29,8 +30,8 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
 
 import org.apache.avro.Conversions;
 import org.apache.avro.Schema;
@@ -88,7 +89,8 @@ public class ParquetReadStrategyTest {
         Assertions.assertNotNull(resource);
         String path = Paths.get(resource.toURI()).toString();
         ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         parquetReadStrategy.init(localConf);
         SeaTunnelRowType seaTunnelRowTypeInfo = 
parquetReadStrategy.getSeaTunnelRowTypeInfo(path);
         Assertions.assertNotNull(seaTunnelRowTypeInfo);
@@ -103,7 +105,8 @@ public class ParquetReadStrategyTest {
         Assertions.assertNotNull(resource);
         String path = Paths.get(resource.toURI()).toString();
         ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         parquetReadStrategy.init(localConf);
         SeaTunnelRowType seaTunnelRowTypeInfo = 
parquetReadStrategy.getSeaTunnelRowTypeInfo(path);
         Assertions.assertNotNull(seaTunnelRowTypeInfo);
@@ -118,7 +121,8 @@ public class ParquetReadStrategyTest {
         Assertions.assertNotNull(resource);
         String path = Paths.get(resource.toURI()).toString();
         ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         parquetReadStrategy.init(localConf);
         SeaTunnelRowType seaTunnelRowTypeInfo = 
parquetReadStrategy.getSeaTunnelRowTypeInfo(path);
         Assertions.assertNotNull(seaTunnelRowTypeInfo);
@@ -152,7 +156,8 @@ public class ParquetReadStrategyTest {
         String confPath = Paths.get(conf.toURI()).toString();
         Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
         ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         parquetReadStrategy.init(localConf);
         parquetReadStrategy.setPluginConfig(pluginConfig);
         SeaTunnelRowType seaTunnelRowTypeInfo = 
parquetReadStrategy.getSeaTunnelRowTypeInfo(path);
@@ -181,7 +186,8 @@ public class ParquetReadStrategyTest {
         String confPath = Paths.get(conf.toURI()).toString();
         Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
         ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         parquetReadStrategy.init(localConf);
         parquetReadStrategy.setPluginConfig(pluginConfig);
         SeaTunnelRowType seaTunnelRowTypeInfo = 
parquetReadStrategy.getSeaTunnelRowTypeInfo(path);
@@ -196,7 +202,8 @@ public class ParquetReadStrategyTest {
     public void testParquetReadArray() throws Exception {
         AutoGenerateParquetData.generateTestData();
         ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         parquetReadStrategy.init(localConf);
         SeaTunnelRowType seaTunnelRowTypeInfo =
                 
parquetReadStrategy.getSeaTunnelRowTypeInfo(AutoGenerateParquetData.DATA_FILE_PATH);
@@ -231,7 +238,8 @@ public class ParquetReadStrategyTest {
         }
 
         ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         parquetReadStrategy.init(localConf);
         SeaTunnelRowType seaTunnelRowTypeInfo =
                 
parquetReadStrategy.getSeaTunnelRowTypeInfo(NativeParquetWriter.DATA_FILE_PATH);
@@ -247,7 +255,8 @@ public class ParquetReadStrategyTest {
         NativeParquetWriterWithAvroIncompatibleColumn.generateTestData();
         try {
             ParquetReadStrategy parquetReadStrategy = new 
ParquetReadStrategy();
-            LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+            LocalFileSystemConf.LocalConf localConf =
+                    new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
             parquetReadStrategy.init(localConf);
             SeaTunnelRowType rowType =
                     parquetReadStrategy.getSeaTunnelRowTypeInfo(
@@ -274,7 +283,8 @@ public class ParquetReadStrategyTest {
         
NativeParquetWriterWithAvroIncompatibleColumnAndList.generateTestData();
         try {
             ParquetReadStrategy parquetReadStrategy = new 
ParquetReadStrategy();
-            LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+            LocalFileSystemConf.LocalConf localConf =
+                    new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
             parquetReadStrategy.init(localConf);
             SeaTunnelRowType rowType =
                     parquetReadStrategy.getSeaTunnelRowTypeInfo(
@@ -300,6 +310,40 @@ public class ParquetReadStrategyTest {
         }
     }
 
+    @DisabledOnOs(OS.WINDOWS)
+    @Test
+    public void testParquetReadColumnsWithNativeParquetFallback() throws 
Exception {
+        NativeParquetWriterWithAvroIncompatibleColumn.generateTestData();
+        try {
+            ParquetReadStrategy parquetReadStrategy = new 
ParquetReadStrategy();
+            LocalFileSystemConf.LocalConf localConf =
+                    new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+            parquetReadStrategy.init(localConf);
+            parquetReadStrategy.setPluginConfig(
+                    ConfigFactory.parseString(
+                            "read_columns = [job_blue-collar, id]\n"
+                                    + "parse_partition_from_path = false"));
+
+            SeaTunnelRowType rowType =
+                    parquetReadStrategy.getSeaTunnelRowTypeInfo(
+                            
NativeParquetWriterWithAvroIncompatibleColumn.DATA_FILE_PATH);
+            Assertions.assertEquals("job_blue-collar", 
rowType.getFieldName(0));
+            Assertions.assertEquals("id", rowType.getFieldName(1));
+
+            TestCollector testCollector = new TestCollector();
+            parquetReadStrategy.read(
+                    
NativeParquetWriterWithAvroIncompatibleColumn.DATA_FILE_PATH,
+                    "",
+                    testCollector);
+            Assertions.assertEquals(1, testCollector.getRows().size());
+            SeaTunnelRow row = testCollector.getRows().get(0);
+            Assertions.assertEquals("engineer", row.getField(0));
+            Assertions.assertEquals(1, row.getField(1));
+        } finally {
+            NativeParquetWriterWithAvroIncompatibleColumn.deleteFile();
+        }
+    }
+
     @DisabledOnOs(OS.WINDOWS)
     @Test
     public void testParquetWithUserConfigRowType() throws Exception {
@@ -313,7 +357,8 @@ public class ParquetReadStrategyTest {
         CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(pluginConfig);
 
         ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         parquetReadStrategy.init(localConf);
 
         SeaTunnelRowType configRowType = catalogTable.getSeaTunnelRowType();
@@ -385,25 +430,6 @@ public class ParquetReadStrategyTest {
         }
     }
 
-    public static class LocalConf extends HadoopConf {
-        private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.LocalFileSystem";
-        private static final String SCHEMA = "file";
-
-        public LocalConf(String hdfsNameKey) {
-            super(hdfsNameKey);
-        }
-
-        @Override
-        public String getFsHdfsImpl() {
-            return HDFS_IMPL;
-        }
-
-        @Override
-        public String getSchema() {
-            return SCHEMA;
-        }
-    }
-
     public static class AutoGenerateParquetData {
 
         public static final String DATA_FILE_PATH = "/tmp/data.parquet";
@@ -544,7 +570,8 @@ public class ParquetReadStrategyTest {
     public void testParquetReadNestedArray() throws Exception {
         AutoGenerateParquetDataWithNestedArray.generateTestData();
         ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         parquetReadStrategy.init(localConf);
         SeaTunnelRowType seaTunnelRowTypeInfo =
                 parquetReadStrategy.getSeaTunnelRowTypeInfo(
@@ -597,7 +624,8 @@ public class ParquetReadStrategyTest {
     public void testParquetReadNestedBytesArray() throws Exception {
         AutoGenerateParquetDataWithNestedBytesArray.generateTestData();
         ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         parquetReadStrategy.init(localConf);
 
         SeaTunnelRowType seaTunnelRowTypeInfo =
@@ -650,7 +678,8 @@ public class ParquetReadStrategyTest {
     public void testParquetReadNestedArrayWithUserConfigRowType() throws 
Exception {
         AutoGenerateParquetDataWithNestedArray.generateTestData();
         ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         parquetReadStrategy.init(localConf);
 
         ArrayType<String[], String> stringArrayType = 
ArrayType.STRING_ARRAY_TYPE;
@@ -728,7 +757,8 @@ public class ParquetReadStrategyTest {
     public void testParquetReadArrayOfMap() throws Exception {
         AutoGenerateParquetDataWithArrayOfMap.generateTestData();
         ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         parquetReadStrategy.init(localConf);
         SeaTunnelRowType seaTunnelRowTypeInfo =
                 parquetReadStrategy.getSeaTunnelRowTypeInfo(
@@ -1090,4 +1120,134 @@ public class ParquetReadStrategyTest {
             }
         }
     }
+
+    @DisabledOnOs(OS.WINDOWS)
+    @Test
+    public void testParquetSchemaMerge() throws Exception {
+        AutoGenerateParquetDataWithSchemaMerge.generateTestData();
+
+        ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        parquetReadStrategy.init(localConf);
+
+        Map<String, Object> props = new HashMap<>();
+        props.put(FileBaseSourceOptions.FILENAME_EXTENSION.key(), "parquet");
+        props.put(FileBaseSourceOptions.SORT_FILES_BY_MOD_TIME.key(), true);
+
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(props);
+
+        parquetReadStrategy.setPluginConfig(readonlyConfig.toConfig());
+
+        List<String> fileNames =
+                parquetReadStrategy.getFileNamesByPath(
+                        AutoGenerateParquetDataWithSchemaMerge.TMP_PATH);
+        Assertions.assertNotNull(fileNames);
+        Assertions.assertEquals(2, fileNames.size());
+
+        String firstFileName = fileNames.get(0);
+        Assertions.assertNotNull(firstFileName);
+
+        SeaTunnelRowType firstFileRowType =
+                
parquetReadStrategy.getSeaTunnelRowTypeInfoWithUserConfigRowType(
+                        firstFileName, null);
+        Assertions.assertNotNull(firstFileRowType);
+        String[] fieldNames = firstFileRowType.getFieldNames();
+        Assertions.assertNotNull(fieldNames);
+        Assertions.assertEquals(5, fieldNames.length);
+        Assertions.assertEquals("id", fieldNames[0]);
+        Assertions.assertEquals("name", fieldNames[1]);
+        Assertions.assertEquals("age", fieldNames[2]);
+        Assertions.assertEquals("salary", fieldNames[3]);
+        Assertions.assertEquals("department", fieldNames[4]);
+
+        AutoGenerateParquetDataWithSchemaMerge.deleteFiles();
+    }
+
+    public static class AutoGenerateParquetDataWithSchemaMerge {
+
+        public static final String TMP_PATH = 
"/tmp/seatunnel/parquet/schemaMerge/";
+        public static final String OLD_FILE_PATH = TMP_PATH + 
"/old_data.parquet";
+        public static final String NEW_FILE_PATH = TMP_PATH + 
"/new_data.parquet";
+
+        public static void generateTestData() throws IOException {
+            deleteFiles();
+
+            generateOldParquetFile();
+            generateNewParquetFile();
+        }
+
+        public static void generateOldParquetFile() throws IOException {
+            String schemaString =
+                    "{\"type\":\"record\",\"name\":\"User\",\"fields\":["
+                            + "{\"name\":\"id\",\"type\":\"int\"},"
+                            + "{\"name\":\"name\",\"type\":\"string\"},"
+                            + "{\"name\":\"age\",\"type\":\"int\"},"
+                            + "{\"name\":\"salary\",\"type\":\"float\"}"
+                            + "]}";
+            Schema schema = new Schema.Parser().parse(schemaString);
+
+            Configuration conf = new Configuration();
+            Path file = new Path(OLD_FILE_PATH);
+
+            ParquetWriter<GenericRecord> writer =
+                    AvroParquetWriter.<GenericRecord>builder(file)
+                            .withSchema(schema)
+                            .withConf(conf)
+                            .withCompressionCodec(CompressionCodecName.SNAPPY)
+                            .build();
+
+            GenericRecord record = new GenericData.Record(schema);
+            record.put("id", 1);
+            record.put("name", "Alice");
+            record.put("age", 30);
+            record.put("salary", 50000.0f);
+
+            writer.write(record);
+            writer.close();
+        }
+
+        public static void generateNewParquetFile() throws IOException {
+            String schemaString =
+                    "{\"type\":\"record\",\"name\":\"User\",\"fields\":["
+                            + "{\"name\":\"id\",\"type\":\"int\"},"
+                            + "{\"name\":\"name\",\"type\":\"string\"},"
+                            + "{\"name\":\"age\",\"type\":\"int\"},"
+                            + "{\"name\":\"salary\",\"type\":\"float\"},"
+                            + "{\"name\":\"department\",\"type\":\"string\"}"
+                            + "]}";
+            Schema schema = new Schema.Parser().parse(schemaString);
+
+            Configuration conf = new Configuration();
+            Path file = new Path(NEW_FILE_PATH);
+
+            ParquetWriter<GenericRecord> writer =
+                    AvroParquetWriter.<GenericRecord>builder(file)
+                            .withSchema(schema)
+                            .withConf(conf)
+                            .withCompressionCodec(CompressionCodecName.SNAPPY)
+                            .build();
+
+            GenericRecord record = new GenericData.Record(schema);
+            record.put("id", 2);
+            record.put("name", "Bob");
+            record.put("age", 35);
+            record.put("salary", 60000.0f);
+            record.put("department", "Engineering");
+
+            writer.write(record);
+            writer.close();
+        }
+
+        public static void deleteFiles() {
+            File oldFile = new File(OLD_FILE_PATH);
+            if (oldFile.exists()) {
+                oldFile.delete();
+            }
+            File newFile = new File(NEW_FILE_PATH);
+            if (newFile.exists()) {
+                newFile.delete();
+            }
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyEncodingTest.java
similarity index 86%
rename from 
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java
rename to 
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyEncodingTest.java
index d16bd96f9c..2229f61091 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyEncodingTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.writer;
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -25,11 +25,7 @@ import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.AbstractReadStrategy;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.XmlReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -114,7 +110,8 @@ public class ReadStrategyEncodingTest {
         String sourceFilePath = Paths.get(sourceFile.toURI()).toString();
         String confPath = Paths.get(conf.toURI()).toString();
         TestCollector testCollector;
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
         readStrategy.setPluginConfig(pluginConfig);
         readStrategy.init(localConf);
@@ -168,23 +165,4 @@ public class ReadStrategyEncodingTest {
             return null;
         }
     }
-
-    public static class LocalConf extends HadoopConf {
-        private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.LocalFileSystem";
-        private static final String SCHEMA = "file";
-
-        public LocalConf(String hdfsNameKey) {
-            super(hdfsNameKey);
-        }
-
-        @Override
-        public String getFsHdfsImpl() {
-            return HDFS_IMPL;
-        }
-
-        @Override
-        public String getSchema() {
-            return SCHEMA;
-        }
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/UpdateSyncModeTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/UpdateSyncModeTest.java
index 9455eeef6f..fff09538f5 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/UpdateSyncModeTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/UpdateSyncModeTest.java
@@ -20,8 +20,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -118,7 +118,7 @@ class UpdateSyncModeTest {
                             targetDir.toUri().toString(),
                             "distcp",
                             "len_mtime"));
-            strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+            strategy.init(new 
LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT));
 
             List<String> files = 
strategy.getFileNamesByPath(sourceDir.toUri().toString());
             Assertions.assertTrue(files.isEmpty(), "Target is newer with same 
len -> SKIP");
@@ -144,7 +144,7 @@ class UpdateSyncModeTest {
                             targetDir.toUri().toString(),
                             "distcp",
                             "len_mtime"));
-            strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+            strategy.init(new 
LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT));
 
             List<String> files = 
strategy.getFileNamesByPath(sourceDir.toUri().toString());
             Assertions.assertEquals(1, files.size());
@@ -171,7 +171,7 @@ class UpdateSyncModeTest {
                             targetDir.toUri().toString(),
                             "strict",
                             "checksum"));
-            strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+            strategy.init(new 
LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT));
 
             List<String> files = 
strategy.getFileNamesByPath(sourceDir.toUri().toString());
             Assertions.assertTrue(files.isEmpty(), "Checksum equal -> SKIP");
@@ -195,7 +195,7 @@ class UpdateSyncModeTest {
                             targetDir.toUri().toString(),
                             "strict",
                             "checksum"));
-            strategy.init(new LocalConf(FS_DEFAULT_NAME_DEFAULT));
+            strategy.init(new 
LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT));
 
             List<String> files = 
strategy.getFileNamesByPath(sourceDir.toUri().toString());
             Assertions.assertEquals(1, files.size());
@@ -223,23 +223,4 @@ class UpdateSyncModeTest {
         configMap.put("compare_mode", compareMode);
         return ConfigFactory.parseMap(configMap);
     }
-
-    static class LocalConf extends HadoopConf {
-        private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.LocalFileSystem";
-        private static final String SCHEMA = "file";
-
-        public LocalConf(String hdfsNameKey) {
-            super(hdfsNameKey);
-        }
-
-        @Override
-        public String getFsHdfsImpl() {
-            return HDFS_IMPL;
-        }
-
-        @Override
-        public String getSchema() {
-            return SCHEMA;
-        }
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategyTest.java
similarity index 89%
rename from 
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
rename to 
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategyTest.java
index fca8f68fd2..01ed161888 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategyTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.writer;
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -27,8 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.XmlReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -62,7 +61,8 @@ public class XmlReadStrategyTest {
         String confPath = Paths.get(conf.toURI()).toString();
         Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
         XmlReadStrategy xmlReadStrategy = new XmlReadStrategy();
-        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf localConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         xmlReadStrategy.setPluginConfig(pluginConfig);
         xmlReadStrategy.init(localConf);
         List<String> fileNamesByPath = 
xmlReadStrategy.getFileNamesByPath(xmlFilePath);
@@ -135,23 +135,4 @@ public class XmlReadStrategyTest {
             return null;
         }
     }
-
-    public static class LocalConf extends HadoopConf {
-        private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.LocalFileSystem";
-        private static final String SCHEMA = "file";
-
-        public LocalConf(String hdfsNameKey) {
-            super(hdfsNameKey);
-        }
-
-        @Override
-        public String getFsHdfsImpl() {
-            return HDFS_IMPL;
-        }
-
-        @Override
-        public String getSchema() {
-            return SCHEMA;
-        }
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/util/LocalFileSystemConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/util/LocalFileSystemConf.java
new file mode 100644
index 0000000000..a573837413
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/util/LocalFileSystemConf.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.util;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+public class LocalFileSystemConf {
+
+    public static class LocalConf extends HadoopConf {
+        private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.LocalFileSystem";
+        private static final String SCHEMA = "file";
+
+        public LocalConf(String hdfsNameKey) {
+            super(hdfsNameKey);
+        }
+
+        @Override
+        public String getFsHdfsImpl() {
+            return HDFS_IMPL;
+        }
+
+        @Override
+        public String getSchema() {
+            return SCHEMA;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
index 55e33e194c..c84841a164 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
@@ -30,6 +30,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.CsvWriteStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.CsvReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -66,8 +67,8 @@ public class CsvWriteStrategyTest {
         FileSinkConfig writeSinkConfig =
                 new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig), 
writeRowType);
         CsvWriteStrategy writeStrategy = new CsvWriteStrategy(writeSinkConfig);
-        ParquetReadStrategyTest.LocalConf hadoopConf =
-                new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf hadoopConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         writeStrategy.setCatalogTable(
                 CatalogTableUtil.getCatalogTable("test", null, null, "test", 
writeRowType));
         writeStrategy.init(hadoopConf, "test1", "test1", 0);
@@ -129,8 +130,8 @@ public class CsvWriteStrategyTest {
         FileSinkConfig writeSinkConfig =
                 new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig), 
writeRowType);
         CsvWriteStrategy writeStrategy = new CsvWriteStrategy(writeSinkConfig);
-        ParquetReadStrategyTest.LocalConf hadoopConf =
-                new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf hadoopConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         writeStrategy.setCatalogTable(
                 CatalogTableUtil.getCatalogTable("test", null, null, "test", 
writeRowType));
         writeStrategy.init(hadoopConf, "test1", "test1", 0);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
index db048b01b8..b1317c5ff7 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
@@ -38,7 +38,7 @@ public class FileSinkConfigTest {
 
     @Test
     public void testConfigInit() throws Exception {
-        URL conf = 
OrcReadStrategyTest.class.getResource("/test_write_hdfs.conf");
+        URL conf = 
FileSinkConfigTest.class.getResource("/test_write_hdfs.conf");
         Assertions.assertNotNull(conf);
         String confPath = Paths.get(conf.toURI()).toString();
         Config config = ConfigFactory.parseFile(new File(confPath));
@@ -53,7 +53,7 @@ public class FileSinkConfigTest {
 
     @Test
     public void testConfigInitDefault() throws Exception {
-        URL conf = 
OrcReadStrategyTest.class.getResource("/test_write_hdfs_default_format.conf");
+        URL conf = 
FileSinkConfigTest.class.getResource("/test_write_hdfs_default_format.conf");
         Assertions.assertNotNull(conf);
         String confPath = Paths.get(conf.toURI()).toString();
         Config config = ConfigFactory.parseFile(new File(confPath));
@@ -68,7 +68,7 @@ public class FileSinkConfigTest {
 
     @Test
     public void testSinkColumnsGreaterThanSource() throws Exception {
-        URL conf = 
OrcReadStrategyTest.class.getResource("/test_write_hive.conf");
+        URL conf = 
FileSinkConfigTest.class.getResource("/test_write_hive.conf");
         Assertions.assertNotNull(conf);
         String confPath = Paths.get(conf.toURI()).toString();
         Config config = ConfigFactory.parseFile(new File(confPath));
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
index fa179d5097..d75c04ea03 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
@@ -28,6 +28,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -66,8 +67,8 @@ public class OrcWriteStrategyTest {
                 new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig), 
writeRowType);
         OrcWriteStrategy writeStrategy = new OrcWriteStrategy(writeSinkConfig);
 
-        OrcReadStrategyTest.LocalConf hadoopConf =
-                new OrcReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf hadoopConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         writeStrategy.setCatalogTable(
                 CatalogTableUtil.getCatalogTable("test", null, null, "test", 
writeRowType));
         writeStrategy.init(hadoopConf, "test1", "test1", 0);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetTypeCoercionTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetTypeCoercionTest.java
index 006f530b50..22c645746f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetTypeCoercionTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetTypeCoercionTest.java
@@ -33,6 +33,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -218,8 +219,8 @@ public class ParquetTypeCoercionTest {
                 new FileSinkConfig(
                         
ReadonlyConfig.fromConfig(ConfigFactory.parseMap(writeConfig)), rowType);
         ParquetWriteStrategy strategy = new ParquetWriteStrategy(sinkConfig);
-        ParquetReadStrategyTest.LocalConf hadoopConf =
-                new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf hadoopConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         strategy.setCatalogTable(
                 CatalogTableUtil.getCatalogTable("test", null, null, testName, 
rowType));
         strategy.init(hadoopConf, "test-" + testName, "test-" + testName, 0);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyEvolutionTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyEvolutionTest.java
index 67850bf2ef..0f63f7dabf 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyEvolutionTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyEvolutionTest.java
@@ -34,6 +34,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -91,8 +92,8 @@ public class ParquetWriteStrategyEvolutionTest {
                         
ReadonlyConfig.fromConfig(ConfigFactory.parseMap(writeConfig)),
                         baseRowType);
         ParquetWriteStrategy strategy = new ParquetWriteStrategy(sinkConfig);
-        ParquetReadStrategyTest.LocalConf hadoopConf =
-                new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf hadoopConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         strategy.setCatalogTable(
                 CatalogTableUtil.getCatalogTable(
                         "test", null, null, "evolution_table", baseRowType));
@@ -194,8 +195,8 @@ public class ParquetWriteStrategyEvolutionTest {
                         
ReadonlyConfig.fromConfig(ConfigFactory.parseMap(writeConfig)),
                         baseRowType);
         ParquetWriteStrategy strategy = new ParquetWriteStrategy(sinkConfig);
-        ParquetReadStrategyTest.LocalConf hadoopConf =
-                new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf hadoopConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         strategy.setCatalogTable(
                 CatalogTableUtil.getCatalogTable(
                         "test", null, null, "evolution_table", baseRowType));
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
index 7fa6ced60d..f24def6288 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
@@ -30,6 +30,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.util.LocalFileSystemConf;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -80,8 +81,8 @@ public class ParquetWriteStrategyTest {
         FileSinkConfig writeSinkConfig =
                 new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig), 
writeRowType);
         ParquetWriteStrategy writeStrategy = new 
ParquetWriteStrategy(writeSinkConfig);
-        ParquetReadStrategyTest.LocalConf hadoopConf =
-                new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        LocalFileSystemConf.LocalConf hadoopConf =
+                new LocalFileSystemConf.LocalConf(FS_DEFAULT_NAME_DEFAULT);
         writeStrategy.setCatalogTable(
                 CatalogTableUtil.getCatalogTable("test", null, null, "test", 
writeRowType));
         writeStrategy.init(hadoopConf, "test1", "test1", 0);

Reply via email to