This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4c110c8fdf [Feature][Connector-V2][HdfsFile] Support true large-file 
split for parallel read (#10332)
4c110c8fdf is described below

commit 4c110c8fdfe740de3c22bf1cd0036eca7aab7d76
Author: yzeng1618 <[email protected]>
AuthorDate: Thu Jan 29 17:48:27 2026 +0800

    [Feature][Connector-V2][HdfsFile] Support true large-file split for 
parallel read (#10332)
    
    Co-authored-by: zengyi <[email protected]>
---
 docs/en/connectors/source/HdfsFile.md              |  26 +++
 docs/en/connectors/source/LocalFile.md             |  15 +-
 docs/zh/connectors/source/HdfsFile.md              |  26 +++
 docs/zh/connectors/source/LocalFile.md             |  13 ++
 .../file/exception/FileConnectorErrorCode.java     |   5 +-
 .../file/source/BaseMultipleTableFileSource.java   |  24 +-
 .../file/source/reader/AbstractReadStrategy.java   |  22 +-
 .../split/AccordingToSplitSizeSplitStrategy.java   | 242 +++++++++++++++------
 .../file/source/split/FileSourceSplit.java         |  14 ++
 .../source/split/FileSplitStrategyFactory.java}    |  23 +-
 .../MultipleTableFileSourceSplitEnumerator.java    |  68 +++++-
 .../split/MultipleTableFileSplitStrategy.java      |  72 ++++++
 .../source/split/ParquetFileSplitStrategy.java     |  74 +++++--
 .../source/reader/AbstractReadStrategyTest.java    |  81 +++++++
 .../split/FileSourceSplitCompatibilityTest.java    | 209 ++++++++++++++++++
 .../seatunnel/file/hdfs/source/HdfsFileSource.java |   6 +-
 .../file/hdfs/source/HdfsFileSourceFactory.java    |  12 +
 ...sFileAccordingToSplitSizeSplitStrategyTest.java | 229 +++++++++++++++++++
 .../split/HdfsFileSplitStrategyFactoryTest.java    | 123 +++++++++++
 .../file/local/source/LocalFileSource.java         |   9 +-
 ...LocalFileAccordingToSplitSizeSplitStrategy.java |  47 ++--
 .../seatunnel/file/local/LocalFileSourceTest.java  |  61 ++++--
 .../file/local/SplitFileStrategyTest.java          | 141 ++++++------
 .../e2e/connector/file/hdfs/HdfsFileIT.java        |  41 ++++
 .../resources/hdfs_parquet_split_to_assert.conf    |  84 +++++++
 .../test/resources/hdfs_text_split_to_assert.conf  |  58 +++++
 26 files changed, 1503 insertions(+), 222 deletions(-)

diff --git a/docs/en/connectors/source/HdfsFile.md 
b/docs/en/connectors/source/HdfsFile.md
index 27e50e40f6..17e9285317 100644
--- a/docs/en/connectors/source/HdfsFile.md
+++ b/docs/en/connectors/source/HdfsFile.md
@@ -88,6 +88,8 @@ Read data from hdfs file system.
 | common-options             |         | no       | -                          
 | Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details.                              
                                                                                
                                                                                
                              |
 | file_filter_modified_start | string  | no       | -                          
 | File modification time filter. The connector will filter some files base on 
the last modification start time (include start time). The default data format 
is `yyyy-MM-dd HH:mm:ss`.                                                       
                                                                                
                   |
 | file_filter_modified_end   | string  | no       | -                          
 | File modification time filter. The connector will filter some files base on 
the last modification end time (not include end time). The default data format 
is `yyyy-MM-dd HH:mm:ss`.                                                       
                                                                                
                   |
+| enable_file_split          | boolean | no       | false                      
 | Turn on logical file split to improve parallelism for huge files. Only 
supported for `text`/`csv`/`json`/`parquet` and non-compressed format.          
                                                                                
                                                                                
                     |
+| file_split_size            | long    | no       | 134217728                  
 | Split size in bytes when `enable_file_split=true`. For `text`/`csv`/`json`, 
the split end will be aligned to the next `row_delimiter`. For `parquet`, the 
split unit is RowGroup and will never break a RowGroup.                         
                                                                                
                  |
 | quote_char                 | string  | no       | "                          
 | A single character that encloses CSV fields, allowing fields with commas, 
line breaks, or quotes to be read correctly.                                    
                                                                                
                                                                                
                    |
 | escape_char                | string  | no       | -                          
 | A single character that allows the quote or other special characters to 
appear inside a CSV field without ending the field.                             
                                                                                
                                                                                
                      |
 
@@ -254,6 +256,30 @@ Only used when `sync_mode=update`. Supported values: 
`len_mtime` (default), `che
 
 - `len_mtime`: SKIP only when both `len` and `mtime` are equal, otherwise COPY.
 - `checksum`: SKIP only when `len` is equal and Hadoop `getFileChecksum` is 
equal, otherwise COPY (only valid when `update_strategy=strict`).
+
+### enable_file_split [boolean]
+
+Turn on the file splitting function, the default is false. It can be selected 
when the file type is csv, text, json, parquet and non-compressed format.
+
+- `text`/`csv`/`json`: split by `file_split_size` and align to the next 
`row_delimiter` to avoid breaking records.
+- `parquet`: split by RowGroup (logical split), never breaks a RowGroup.
+
+**Recommendations**
+- Enable when reading a few large files and you want higher read parallelism.
+- Disable when reading many small files, or when parallelism is low (splitting 
adds overhead).
+
+**Limitations**
+- Not supported for compressed files (`compress_codec` != `none`) or archive 
files (`archive_compress_codec` != `none`) — it will fall back to non-splitting.
+- For `text`/`csv`/`json`, actual split size may be larger than 
`file_split_size` because the split end is aligned to the next `row_delimiter`.
+
+### file_split_size [long]
+
+File split size, which can be filled in when the enable_file_split parameter 
is true. The unit is the number of bytes. The default value is the number of 
bytes of 128MB, which is 134217728.
+
+**Tuning**
+- Start with the default (128MB). Decrease it if parallelism is 
under-utilized; increase it if the number of splits is too large.
+- Rough rule: `file_split_size ≈ file_size / desired_parallelism`.
+
 ### quote_char [string]
 
 A single character that encloses CSV fields, allowing fields with commas, line 
breaks, or quotes to be read correctly.
diff --git a/docs/en/connectors/source/LocalFile.md 
b/docs/en/connectors/source/LocalFile.md
index 5dcfad7cfa..72c01544c3 100644
--- a/docs/en/connectors/source/LocalFile.md
+++ b/docs/en/connectors/source/LocalFile.md
@@ -418,14 +418,27 @@ File modification time filter. The connector will filter 
some files base on the
 
 File modification time filter. The connector will filter some files base on 
the last modification end time (not include end time). The default data format 
is `yyyy-MM-dd HH:mm:ss`.
 
-### enable_file_split [string]
+### enable_file_split [boolean]
 
 Turn on the file splitting function, the default is false.It can be selected 
when the file type is csv, text, json, parquet and non-compressed format.
 
+**Recommendations**
+- Enable when reading a few large files and you want higher read parallelism.
+- Disable when reading many small files, or when parallelism is low (splitting 
adds overhead).
+
+**Limitations**
+- Not supported for compressed files (`compress_codec` != `none`) or archive 
files (`archive_compress_codec` != `none`) — it will fall back to non-splitting.
+- For `text`/`csv`/`json`, actual split size may be larger than 
`file_split_size` because the split end is aligned to the next `row_delimiter`.
+- LocalFile uses Hadoop LocalFileSystem internally; no extra Hadoop 
configuration is required.
+
 ### file_split_size [long]
 
 File split size, which can be filled in when the enable_file_split parameter 
is true. The unit is the number of bytes. The default value is the number of 
bytes of 128MB, which is 134217728.
 
+**Tuning**
+- Start with the default (128MB). Decrease it if parallelism is 
under-utilized; increase it if the number of splits is too large.
+- Rough rule: `file_split_size ≈ file_size / desired_parallelism`.
+
 ### quote_char [string]
 
 A single character that encloses CSV fields, allowing fields with commas, line 
breaks, or quotes to be read correctly.
diff --git a/docs/zh/connectors/source/HdfsFile.md 
b/docs/zh/connectors/source/HdfsFile.md
index 811059e6c5..d17b8da333 100644
--- a/docs/zh/connectors/source/HdfsFile.md
+++ b/docs/zh/connectors/source/HdfsFile.md
@@ -88,6 +88,8 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
 | common-options             |         | 否    | -                   | 
数据源插件通用参数,请参阅 [数据源通用选项](../source-common-options.md) 了解详情。                      
                                                                                
                 |
 | file_filter_modified_start | string  | 否    | -                   | 
按照最后修改时间过滤文件。 要过滤的开始时间(包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`                       
                                                                                
                 |
 | file_filter_modified_end   | string  | 否    | -                   | 
按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`                      
                                                                                
                 |
+| enable_file_split          | boolean | 否    | false               | 
开启大文件拆分以提升并行度。仅支持 `text`/`csv`/`json`/`parquet` 且非压缩格式(`compress_codec=none` 且 
`archive_compress_codec=none`)。                                                 
                                |
+| file_split_size            | long    | 否    | 134217728           | 
`enable_file_split=true` 时生效,单位字节。`text`/`csv`/`json` 按 `file_split_size` 
拆分并对齐到下一个 `row_delimiter`;`parquet` 以 RowGroup 为拆分单位,不会切开 RowGroup。             
                                   |
 | quote_char                 | string  | 否    | "                   | 用于包裹 CSV 
字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。                                                  
                                                                                
        |
 | escape_char                | string  | 否    | -                   | 用于在 CSV 
字段内转义引号或其他特殊字符,使其不会结束字段。                                                        
                                                                                
         |
 
@@ -255,6 +257,30 @@ abc.*
 
 - `len_mtime`:`len` 与 `mtime` 都相同才 SKIP,否则 COPY。
 - `checksum`:要求 `len` 相同且 Hadoop `getFileChecksum` 相同才 SKIP,否则 COPY(仅在 
`update_strategy=strict` 时生效)。
+
+### enable_file_split [boolean]
+
+开启大文件拆分功能,默认 false。仅支持 `csv`/`text`/`json`/`parquet` 
且非压缩格式(`compress_codec=none` 且 `archive_compress_codec=none`)。
+
+- `text`/`csv`/`json`:按 `file_split_size` 拆分并对齐到下一个 
`row_delimiter`,避免切开一行/一条记录。
+- `parquet`:以 RowGroup 为逻辑拆分单位,不会切开 RowGroup。
+
+**使用建议**
+- 适合:读取少量大文件,并希望通过更高并行度提升吞吐。
+- 不建议:读取大量小文件,或并行度较低的场景(拆分会带来额外的枚举/调度开销)。
+
+**限制说明**
+- 不支持压缩文件(`compress_codec` != `none`)或归档文件(`archive_compress_codec` != 
`none`),会自动回退为不拆分。
+- 对于 `text`/`csv`/`json`,实际 split 的大小可能略大于 `file_split_size`(因为需要对齐到下一个 
`row_delimiter`)。
+
+### file_split_size [long]
+
+`enable_file_split=true` 时生效,单位字节。默认 128MB(134217728)。
+
+**调优建议**
+- 建议从默认值(128MB)开始:如果并行度未充分利用可适当调小;如果 split 数量过多可适当调大。
+- 经验公式:`file_split_size ≈ file_size / 期望并行度`。
+
 ### quote_char [string]
 
 用于包裹 CSV 字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。
diff --git a/docs/zh/connectors/source/LocalFile.md 
b/docs/zh/connectors/source/LocalFile.md
index 0453b09083..ef6dabed25 100644
--- a/docs/zh/connectors/source/LocalFile.md
+++ b/docs/zh/connectors/source/LocalFile.md
@@ -423,10 +423,23 @@ null_format 定义哪些字符串可以表示为 null。
 
 开启文件分割功能,默认为false。文件类型为csv、text、json、parquet非压缩格式时可选择。
 
+**使用建议**
+- 适合:读取少量大文件,并希望通过更高并行度提升吞吐。
+- 不建议:读取大量小文件,或并行度较低的场景(拆分会带来额外的枚举/调度开销)。
+
+**限制说明**
+- 不支持压缩文件(`compress_codec` != `none`)或归档文件(`archive_compress_codec` != 
`none`),会自动回退为不拆分。
+- 对于 `text`/`csv`/`json`,实际 split 的大小可能略大于 `file_split_size`(因为需要对齐到下一个 
`row_delimiter`)。
+- LocalFile 内部使用 Hadoop LocalFileSystem(`file:///`),通常不需要额外 Hadoop 配置。
+
 ### file_split_size [long]
 
 文件分割大小,enable_file_split参数为true时可以填写。单位是字节数。默认值为128MB的字节数,即134217728。
 
+**调优建议**
+- 建议从默认值(128MB)开始:如果并行度未充分利用可适当调小;如果 split 数量过多可适当调大。
+- 经验公式:`file_split_size ≈ file_size / 期望并行度`。
+
 ### quote_char [string]
 
 用于包裹 CSV 字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
index db30dfcb8f..23b3ea5fb3 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
@@ -30,7 +30,10 @@ public enum FileConnectorErrorCode implements 
SeaTunnelErrorCode {
     FILE_READ_FAILED("FILE-08", "File read failed"),
     BINARY_FILE_PART_ORDER_ERROR("FILE-09", "Binary file fragment order 
abnormality"),
     FILE_SPLIT_SIZE_ILLEGAL("FILE-10", "SplitSizeBytes must be greater than 
0"),
-    FILE_SPLIT_FAIL("FILE-11", "File split fail");
+    FILE_SPLIT_FAIL("FILE-11", "File split fail"),
+    FILE_NOT_FOUND("FILE-12", "File not found"),
+    FILE_ACCESS_DENIED("FILE-13", "File access denied"),
+    FILE_IO_TIMEOUT("FILE-14", "File IO timeout");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseMultipleTableFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseMultipleTableFileSource.java
index 6626ccd2a9..5b7c91913f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseMultipleTableFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseMultipleTableFileSource.java
@@ -31,10 +31,14 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.MultipleTabl
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategyFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.MultipleTableFileSourceSplitEnumerator;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.MultipleTableFileSplitStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public abstract class BaseMultipleTableFileSource
@@ -58,6 +62,21 @@ public abstract class BaseMultipleTableFileSource
         this.fileSplitStrategy = fileSplitStrategy;
     }
 
+    protected static FileSplitStrategy initFileSplitStrategy(
+            BaseMultipleTableFileSourceConfig sourceConfig) {
+        Map<String, FileSplitStrategy> splitStrategies = new HashMap<>();
+        for (BaseFileSourceConfig fileSourceConfig : 
sourceConfig.getFileSourceConfigs()) {
+            String tableId =
+                    
fileSourceConfig.getCatalogTable().getTableId().toTablePath().toString();
+            splitStrategies.put(
+                    tableId,
+                    FileSplitStrategyFactory.initFileSplitStrategy(
+                            fileSourceConfig.getBaseFileSourceConfig(),
+                            fileSourceConfig.getHadoopConfig()));
+        }
+        return new MultipleTableFileSplitStrategy(splitStrategies);
+    }
+
     @Override
     public Boundedness getBoundedness() {
         return Boundedness.BOUNDED;
@@ -91,6 +110,9 @@ public abstract class BaseMultipleTableFileSource
             SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext,
             FileSourceState checkpointState) {
         return new MultipleTableFileSourceSplitEnumerator(
-                enumeratorContext, baseMultipleTableFileSourceConfig, 
checkpointState);
+                enumeratorContext,
+                baseMultipleTableFileSourceConfig,
+                fileSplitStrategy,
+                checkpointState);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index e64eee5795..80b88cd170 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -50,6 +50,7 @@ import org.apache.commons.io.input.BoundedInputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -522,13 +523,22 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
 
     protected static InputStream safeSlice(InputStream in, long start, long 
length)
             throws IOException {
-        long toSkip = start;
-        while (toSkip > 0) {
-            long skipped = in.skip(toSkip);
-            if (skipped <= 0) {
-                throw new SeaTunnelException("skipped error");
+        if (start > 0) {
+            if (in instanceof Seekable) {
+                ((Seekable) in).seek(start);
+            } else {
+                long toSkip = start;
+                while (toSkip > 0) {
+                    long skipped = in.skip(toSkip);
+                    if (skipped <= 0) {
+                        throw new SeaTunnelException("skipped error");
+                    }
+                    toSkip -= skipped;
+                }
             }
-            toSkip -= skipped;
+        }
+        if (length < 0) {
+            return in;
         }
         return new BoundedInputStream(in, length);
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
index 11117fef88..ca9a60e979 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
@@ -17,88 +17,182 @@
 package org.apache.seatunnel.connectors.seatunnel.file.source.split;
 
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
 
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.AccessControlException;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
 import java.nio.charset.Charset;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 
-public abstract class AccordingToSplitSizeSplitStrategy implements 
FileSplitStrategy {
+/**
+ * {@link AccordingToSplitSizeSplitStrategy} defines a split strategy for 
text-like files by using
+ * {@code rowDelimiter} as the minimum indivisible unit and generating {@link 
FileSourceSplit}s by
+ * merging one or more contiguous rows according to the configured split size.
+ *
+ * <p>This strategy will never break a row delimiter, ensuring each split 
starts at a row boundary.
+ *
+ * <p>To avoid scanning the whole file for large files, this strategy uses 
{@link FSDataInputStream}
+ * seek to locate the next delimiter around each split boundary.
+ */
+public class AccordingToSplitSizeSplitStrategy implements FileSplitStrategy, 
Closeable {
 
+    private static final int BUFFER_SIZE = 64 * 1024;
+
+    private final HadoopFileSystemProxy hadoopFileSystemProxy;
     private final long skipHeaderRowNumber;
     private final long splitSize;
     private final byte[] delimiterBytes;
-    private static final int BUFFER_SIZE = 64 * 1024;
 
     public AccordingToSplitSizeSplitStrategy(
-            String rowDelimiter, long skipHeaderRowNumber, String 
encodingName, long splitSize) {
+            HadoopConf hadoopConf,
+            String rowDelimiter,
+            long skipHeaderRowNumber,
+            String encodingName,
+            long splitSize) {
+        if (splitSize <= 0) {
+            throw new SeaTunnelRuntimeException(
+                    FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
+                    "SplitSizeBytes must be greater than 0");
+        }
+        if (rowDelimiter == null || rowDelimiter.isEmpty()) {
+            throw new SeaTunnelRuntimeException(
+                    FileConnectorErrorCode.FILE_SPLIT_FAIL, "rowDelimiter must 
not be empty");
+        }
+        this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
         this.skipHeaderRowNumber = skipHeaderRowNumber;
         this.splitSize = splitSize;
         this.delimiterBytes = 
rowDelimiter.getBytes(Charset.forName(encodingName));
+        if (delimiterBytes.length == 0) {
+            throw new SeaTunnelRuntimeException(
+                    FileConnectorErrorCode.FILE_SPLIT_FAIL,
+                    "rowDelimiter must not be empty after encoding");
+        }
     }
 
     @Override
     public List<FileSourceSplit> split(String tableId, String filePath) {
+        String normalizedPath = normalizePath(filePath);
         List<FileSourceSplit> splits = new ArrayList<>();
-        long fileSize = safeGetFileSize(filePath);
+        long fileSize = safeGetFileSize(normalizedPath);
         if (fileSize == 0) {
             return splits;
         }
-        long currentStart = 0;
-        if (skipHeaderRowNumber > 0) {
-            currentStart = skipHeaderWithBuffer(filePath, skipHeaderRowNumber);
-        }
-        while (currentStart < fileSize) {
-            long tentativeEnd = currentStart + splitSize;
-            if (tentativeEnd >= fileSize) {
+        try (FSDataInputStream input = 
hadoopFileSystemProxy.getInputStream(normalizedPath)) {
+            long currentStart = 0;
+            if (skipHeaderRowNumber > 0) {
+                currentStart = skipLinesUsingBuffer(input, 
skipHeaderRowNumber);
+            }
+            while (currentStart < fileSize) {
+                long tentativeEnd = currentStart + splitSize;
+                if (tentativeEnd >= fileSize) {
+                    splits.add(
+                            new FileSourceSplit(
+                                    tableId,
+                                    normalizedPath,
+                                    currentStart,
+                                    fileSize - currentStart));
+                    break;
+                }
+                long actualEnd = findNextDelimiterWithSeek(input, 
tentativeEnd, fileSize);
+                if (actualEnd <= currentStart) {
+                    actualEnd = tentativeEnd;
+                }
                 splits.add(
                         new FileSourceSplit(
-                                tableId, filePath, currentStart, fileSize - 
currentStart));
-                break;
-            }
-            long actualEnd = findNextDelimiterWithBuffer(filePath, 
tentativeEnd);
-            if (actualEnd <= currentStart) {
-                actualEnd = tentativeEnd;
+                                tableId, normalizedPath, currentStart, 
actualEnd - currentStart));
+                currentStart = actualEnd;
             }
-            splits.add(
-                    new FileSourceSplit(tableId, filePath, currentStart, 
actualEnd - currentStart));
-            currentStart = actualEnd;
+            return splits;
+        } catch (IOException e) {
+            throw mapToRuntimeException(normalizedPath, "Split file", e);
         }
-        return splits;
     }
 
-    protected abstract InputStream getInputStream(String filePath) throws 
IOException;
-
-    protected abstract long getFileSize(String filePath) throws IOException;
-
     private long safeGetFileSize(String filePath) {
         try {
-            return getFileSize(filePath);
+            return hadoopFileSystemProxy.getFileStatus(filePath).getLen();
         } catch (IOException e) {
-            throw new 
SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_READ_FAILED, e);
+            throw mapToRuntimeException(filePath, "Get file status", e);
         }
     }
 
-    private long skipHeaderWithBuffer(String filePath, long skipLines) {
-        try (InputStream input = getInputStream(filePath)) {
-            return skipLinesUsingBuffer(input, skipLines);
-        } catch (IOException e) {
-            throw new 
SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_READ_FAILED, e);
+    private static SeaTunnelRuntimeException mapToRuntimeException(
+            String filePath, String operation, IOException e) {
+        IOException unwrapped = unwrapRemoteException(e);
+        FileConnectorErrorCode errorCode = 
mapIOExceptionToErrorCode(unwrapped);
+        String message =
+                String.format(
+                        "%s for [%s] failed, cause=%s: %s",
+                        operation,
+                        filePath,
+                        unwrapped.getClass().getSimpleName(),
+                        unwrapped.getMessage());
+        return new SeaTunnelRuntimeException(errorCode, message, unwrapped);
+    }
+
+    private static FileConnectorErrorCode 
mapIOExceptionToErrorCode(IOException e) {
+        if (hasCause(e, FileNotFoundException.class) || hasCause(e, 
NoSuchFileException.class)) {
+            return FileConnectorErrorCode.FILE_NOT_FOUND;
+        }
+        if (hasCause(e, AccessDeniedException.class) || hasCause(e, 
AccessControlException.class)) {
+            return FileConnectorErrorCode.FILE_ACCESS_DENIED;
+        }
+        if (hasCause(e, SocketTimeoutException.class)
+                || hasCause(e, InterruptedIOException.class)) {
+            return FileConnectorErrorCode.FILE_IO_TIMEOUT;
+        }
+        return FileConnectorErrorCode.FILE_READ_FAILED;
+    }
+
+    private static boolean hasCause(Throwable throwable, Class<? extends 
Throwable> type) {
+        Throwable current = throwable;
+        while (current != null) {
+            if (type.isInstance(current)) {
+                return true;
+            }
+            current = current.getCause();
         }
+        return false;
     }
 
-    private long skipLinesUsingBuffer(InputStream is, long skipLines) throws 
IOException {
+    private static IOException unwrapRemoteException(IOException e) {
+        if (e instanceof RemoteException) {
+            return ((RemoteException) e)
+                    .unwrapRemoteException(
+                            FileNotFoundException.class,
+                            NoSuchFileException.class,
+                            AccessControlException.class,
+                            AccessDeniedException.class,
+                            SocketTimeoutException.class,
+                            InterruptedIOException.class);
+        }
+        return e;
+    }
+
+    private long skipLinesUsingBuffer(FSDataInputStream input, long skipLines) 
throws IOException {
+        input.seek(0);
         byte[] buffer = new byte[BUFFER_SIZE];
-        long matched = 0;
+        int matched = 0;
         long lines = 0;
         long pos = 0;
         int n;
-        while ((n = is.read(buffer)) != -1) {
+        while ((n = input.read(buffer)) != -1) {
             for (int i = 0; i < n; i++) {
                 pos++;
-                if (buffer[i] == delimiterBytes[(int) matched]) {
+                if (buffer[i] == delimiterBytes[matched]) {
                     matched++;
                     if (matched == delimiterBytes.length) {
                         matched = 0;
@@ -108,53 +202,59 @@ public abstract class AccordingToSplitSizeSplitStrategy 
implements FileSplitStra
                         }
                     }
                 } else {
-                    matched = 0;
+                    matched = buffer[i] == delimiterBytes[0] ? 1 : 0;
                 }
             }
         }
-
         return pos;
     }
 
-    private long findNextDelimiterWithBuffer(String filePath, long startPos) {
-        try (InputStream is = getInputStream(filePath)) {
-            long skipped = skipManually(is, startPos);
-            if (skipped < startPos) {
-                return startPos;
-            }
-            byte[] buffer = new byte[BUFFER_SIZE];
-            long matched = 0;
-            long pos = startPos;
-            int n;
-            while ((n = is.read(buffer)) != -1) {
-                for (int i = 0; i < n; i++) {
-                    pos++;
-                    if (buffer[i] == delimiterBytes[(int) matched]) {
-                        matched++;
-                        if (matched == delimiterBytes.length) {
-                            return pos;
+    private long findNextDelimiterWithSeek(FSDataInputStream input, long 
startPos, long fileSize)
+            throws IOException {
+        long scanStart = Math.max(0, startPos - (delimiterBytes.length - 1));
+        input.seek(scanStart);
+        byte[] buffer = new byte[BUFFER_SIZE];
+        int matched = 0;
+        long pos = scanStart;
+        int n;
+        while ((n = input.read(buffer)) != -1) {
+            for (int i = 0; i < n; i++) {
+                pos++;
+                if (buffer[i] == delimiterBytes[matched]) {
+                    matched++;
+                    if (matched == delimiterBytes.length) {
+                        long endPos = pos;
+                        if (endPos >= startPos) {
+                            return endPos;
                         }
-                    } else {
                         matched = 0;
                     }
+                } else {
+                    matched = buffer[i] == delimiterBytes[0] ? 1 : 0;
                 }
             }
-            return pos;
-
-        } catch (IOException e) {
-            throw new 
SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_READ_FAILED, e);
         }
+        return Math.min(fileSize, pos);
     }
 
-    private long skipManually(InputStream is, long bytesToSkip) throws 
IOException {
-        byte[] buffer = new byte[BUFFER_SIZE];
-        long total = 0;
-        while (total < bytesToSkip) {
-            long toRead = Math.min(buffer.length, bytesToSkip - total);
-            int n = is.read(buffer, 0, (int) toRead);
-            if (n == -1) break;
-            total += n;
-        }
-        return total;
+    @Override
+    public void close() throws IOException {
+        hadoopFileSystemProxy.close();
+    }
+
+    private static String normalizePath(String filePath) {
+        if (filePath == null) {
+            return null;
+        }
+        if (filePath.contains("://")) {
+            return filePath;
+        }
+        if (filePath.length() >= 3
+                && Character.isLetter(filePath.charAt(0))
+                && filePath.charAt(1) == ':'
+                && (filePath.charAt(2) == '\\' || filePath.charAt(2) == '/')) {
+            return Paths.get(filePath).toUri().toString();
+        }
+        return filePath;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
index fea28898b0..15053b162d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.api.source.SourceSplit;
 
 import lombok.Getter;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.util.Objects;
 
 public class FileSourceSplit implements SourceSplit {
@@ -48,6 +50,15 @@ public class FileSourceSplit implements SourceSplit {
         this.length = length;
     }
 
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        in.defaultReadObject();
+        // Compatibility: old checkpoints (before file-split fields) 
deserialize with
+        // start=0/length=0.
+        if (start == 0L && length == 0L) {
+            length = -1L;
+        }
+    }
+
     @Override
     public String splitId() {
         // In order to be compatible with the split before the upgrade, when 
tableId is null,
@@ -55,6 +66,9 @@ public class FileSourceSplit implements SourceSplit {
         if (tableId == null) {
             return filePath;
         }
+        if (start == 0L && length < 0L) {
+            return tableId + "_" + filePath;
+        }
         return tableId + "_" + filePath + "_" + start;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSplitStrategyFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactory.java
similarity index 79%
rename from 
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSplitStrategyFactory.java
rename to 
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactory.java
index d6b632e164..a801927e9d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSplitStrategyFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactory.java
@@ -14,22 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.connectors.seatunnel.file.local.source.split;
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.ParquetFileSplitStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+import java.util.Objects;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions.DEFAULT_ROW_DELIMITER;
 
-public class LocalFileSplitStrategyFactory {
+public class FileSplitStrategyFactory {
 
-    public static FileSplitStrategy initFileSplitStrategy(ReadonlyConfig 
readonlyConfig) {
+    public static FileSplitStrategy initFileSplitStrategy(
+            ReadonlyConfig readonlyConfig, HadoopConf hadoopConf) {
         if (!readonlyConfig.get(FileBaseSourceOptions.ENABLE_FILE_SPLIT)) {
             return new DefaultFileSplitStrategy();
         }
@@ -41,9 +42,13 @@ public class LocalFileSplitStrategyFactory {
                         != ArchiveCompressFormat.NONE) {
             return new DefaultFileSplitStrategy();
         }
+
+        Objects.requireNonNull(
+                hadoopConf, "hadoopConf must not be null when file split is 
enabled");
+
         long fileSplitSize = 
readonlyConfig.get(FileBaseSourceOptions.FILE_SPLIT_SIZE);
         if (FileFormat.PARQUET == 
readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE)) {
-            return new ParquetFileSplitStrategy(fileSplitSize);
+            return new ParquetFileSplitStrategy(fileSplitSize, hadoopConf);
         }
         String rowDelimiter =
                 
!readonlyConfig.getOptional(FileBaseSourceOptions.ROW_DELIMITER).isPresent()
@@ -54,7 +59,7 @@ public class LocalFileSplitStrategyFactory {
                         ? 1L
                         : 
readonlyConfig.get(FileBaseSourceOptions.SKIP_HEADER_ROW_NUMBER);
         String encodingName = 
readonlyConfig.get(FileBaseSourceOptions.ENCODING);
-        return new LocalFileAccordingToSplitSizeSplitStrategy(
-                rowDelimiter, skipHeaderRowNumber, encodingName, 
fileSplitSize);
+        return new AccordingToSplitSizeSplitStrategy(
+                hadoopConf, rowDelimiter, skipHeaderRowNumber, encodingName, 
fileSplitSize);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
index 31b46140d5..b0c5dd2ad9 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
@@ -26,9 +26,11 @@ import org.apache.commons.collections4.CollectionUtils;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -41,6 +43,8 @@ import java.util.stream.Collectors;
 public class MultipleTableFileSourceSplitEnumerator
         implements SourceSplitEnumerator<FileSourceSplit, FileSourceState> {
 
+    private static final int LOG_SPLIT_ID_LIMIT = 50;
+
     private final Context<FileSourceSplit> context;
     private final Set<FileSourceSplit> allSplit;
     private final Set<FileSourceSplit> assignedSplit;
@@ -78,15 +82,42 @@ public class MultipleTableFileSourceSplitEnumerator
         this.assignedSplit.addAll(fileSourceState.getAssignedSplit());
     }
 
+    public MultipleTableFileSourceSplitEnumerator(
+            Context<FileSourceSplit> context,
+            BaseMultipleTableFileSourceConfig multipleTableFileSourceConfig,
+            FileSplitStrategy fileSplitStrategy,
+            FileSourceState fileSourceState) {
+        this(context, multipleTableFileSourceConfig, fileSplitStrategy);
+        this.assignedSplit.addAll(fileSourceState.getAssignedSplit());
+    }
+
     @Override
     public void open() {
+        boolean hasMultiSplits = false;
+        Map<String, Integer> splitCountByTable = new HashMap<>();
         for (Map.Entry<String, List<String>> filePathEntry : 
filePathMap.entrySet()) {
             String tableId = filePathEntry.getKey();
             List<String> filePaths = filePathEntry.getValue();
             for (String filePath : filePaths) {
-                allSplit.addAll(fileSplitStrategy.split(tableId, filePath));
+                List<FileSourceSplit> splits = 
fileSplitStrategy.split(tableId, filePath);
+                splitCountByTable.merge(tableId, splits.size(), Integer::sum);
+                allSplit.addAll(splits);
+                if (splits.size() > 1) {
+                    hasMultiSplits = true;
+                    log.info(
+                            "Split file [{}] for table [{}] into {} splits",
+                            filePath,
+                            tableId,
+                            splits.size());
+                }
             }
         }
+        if (hasMultiSplits) {
+            log.info(
+                    "Split enumeration finished, total splits: {}, splits by 
table: {}",
+                    allSplit.size(),
+                    splitCountByTable);
+        }
     }
 
     @Override
@@ -146,13 +177,27 @@ public class MultipleTableFileSourceSplitEnumerator
         log.info(
                 "SubTask {} is assigned to [{}], size {}",
                 taskId,
-                currentTaskSplits.stream()
-                        .map(FileSourceSplit::splitId)
-                        .collect(Collectors.joining(",")),
+                summarizeSplitIds(currentTaskSplits),
                 currentTaskSplits.size());
         context.signalNoMoreSplits(taskId);
     }
 
+    private static String summarizeSplitIds(List<FileSourceSplit> splits) {
+        if (splits.isEmpty()) {
+            return "";
+        }
+        if (splits.size() <= LOG_SPLIT_ID_LIMIT) {
+            return 
splits.stream().map(FileSourceSplit::splitId).collect(Collectors.joining(","));
+        }
+        return splits.stream()
+                        .limit(LOG_SPLIT_ID_LIMIT)
+                        .map(FileSourceSplit::splitId)
+                        .collect(Collectors.joining(","))
+                + ",...("
+                + (splits.size() - LOG_SPLIT_ID_LIMIT)
+                + " more)";
+    }
+
     private static int getSplitOwner(int assignCount, int numReaders) {
         return assignCount % numReaders;
     }
@@ -169,6 +214,19 @@ public class MultipleTableFileSourceSplitEnumerator
 
     @Override
     public void close() throws IOException {
-        // do nothing
+        if (fileSplitStrategy instanceof Closeable) {
+            ((Closeable) fileSplitStrategy).close();
+            return;
+        }
+        if (fileSplitStrategy instanceof AutoCloseable) {
+            try {
+                ((AutoCloseable) fileSplitStrategy).close();
+            } catch (Exception e) {
+                if (e instanceof IOException) {
+                    throw (IOException) e;
+                }
+                throw new IOException(e);
+            }
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSplitStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSplitStrategy.java
new file mode 100644
index 0000000000..ce76e3c9b8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSplitStrategy.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class MultipleTableFileSplitStrategy implements FileSplitStrategy, 
Closeable {
+
+    private final Map<String, FileSplitStrategy> delegateStrategies;
+    private final FileSplitStrategy fallbackStrategy;
+
+    public MultipleTableFileSplitStrategy(Map<String, FileSplitStrategy> 
delegateStrategies) {
+        this.delegateStrategies = Objects.requireNonNull(delegateStrategies, 
"delegateStrategies");
+        this.fallbackStrategy = new DefaultFileSplitStrategy();
+    }
+
+    @Override
+    public java.util.List<FileSourceSplit> split(String tableId, String 
filePath) {
+        FileSplitStrategy delegate = delegateStrategies.get(tableId);
+        if (delegate == null) {
+            return fallbackStrategy.split(tableId, filePath);
+        }
+        return delegate.split(tableId, filePath);
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOException exception = null;
+        Set<FileSplitStrategy> uniqueStrategies = new 
HashSet<>(delegateStrategies.values());
+        for (FileSplitStrategy strategy : uniqueStrategies) {
+            try {
+                if (strategy instanceof Closeable) {
+                    ((Closeable) strategy).close();
+                    continue;
+                }
+                if (strategy instanceof AutoCloseable) {
+                    ((AutoCloseable) strategy).close();
+                }
+            } catch (Exception e) {
+                IOException current =
+                        e instanceof IOException ? (IOException) e : new 
IOException(e);
+                if (exception == null) {
+                    exception = current;
+                } else {
+                    exception.addSuppressed(current);
+                }
+            }
+        }
+        if (exception != null) {
+            throw exception;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
index 48b7adc102..93a12d3213 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
@@ -18,7 +18,9 @@
 package org.apache.seatunnel.connectors.seatunnel.file.source.split;
 
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -26,6 +28,7 @@ import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -46,9 +49,10 @@ import java.util.List;
  * <p>This design enables efficient parallel reading of Parquet files while 
preserving Parquet
  * format semantics and avoiding invalid byte-level splits.
  */
-public class ParquetFileSplitStrategy implements FileSplitStrategy {
+public class ParquetFileSplitStrategy implements FileSplitStrategy, Closeable {
 
     private final long splitSizeBytes;
+    private final HadoopFileSystemProxy hadoopFileSystemProxy;
 
     public ParquetFileSplitStrategy(long splitSizeBytes) {
         if (splitSizeBytes <= 0) {
@@ -57,6 +61,17 @@ public class ParquetFileSplitStrategy implements 
FileSplitStrategy {
                     "SplitSizeBytes must be greater than 0");
         }
         this.splitSizeBytes = splitSizeBytes;
+        this.hadoopFileSystemProxy = null;
+    }
+
+    public ParquetFileSplitStrategy(long splitSizeBytes, HadoopConf 
hadoopConf) {
+        if (splitSizeBytes <= 0) {
+            throw new SeaTunnelRuntimeException(
+                    FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
+                    "SplitSizeBytes must be greater than 0");
+        }
+        this.splitSizeBytes = splitSizeBytes;
+        this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
     }
 
     @Override
@@ -78,41 +93,74 @@ public class ParquetFileSplitStrategy implements 
FileSplitStrategy {
             return splits;
         }
         long currentStart = 0;
-        long currentLength = 0;
+        long currentEnd = 0;
         boolean hasOpenSplit = false;
         for (BlockMetaData block : rowGroups) {
             long rgStart = block.getStartingPos();
             long rgSize = block.getCompressedSize();
+            long rgEnd = rgStart + rgSize;
             // start a new split
             if (!hasOpenSplit) {
                 currentStart = rgStart;
-                currentLength = rgSize;
+                currentEnd = rgEnd;
                 hasOpenSplit = true;
                 continue;
             }
             // exceeds threshold, close current split
-            if (currentLength + rgSize > splitSizeBytes) {
-                splits.add(new FileSourceSplit(tableId, filePath, 
currentStart, currentLength));
+            if (rgEnd - currentStart > splitSizeBytes) {
+                splits.add(
+                        new FileSourceSplit(
+                                tableId, filePath, currentStart, currentEnd - 
currentStart));
                 // start next split
                 currentStart = rgStart;
-                currentLength = rgSize;
+                currentEnd = rgEnd;
             } else {
-                currentLength += rgSize;
+                currentEnd = rgEnd;
             }
         }
         // last split
-        if (hasOpenSplit && currentLength > 0) {
-            splits.add(new FileSourceSplit(tableId, filePath, currentStart, 
currentLength));
+        if (hasOpenSplit && currentEnd > currentStart) {
+            splits.add(
+                    new FileSourceSplit(
+                            tableId, filePath, currentStart, currentEnd - 
currentStart));
         }
         return splits;
     }
 
     private List<BlockMetaData> readRowGroups(String filePath) throws 
IOException {
         Path path = new Path(filePath);
-        Configuration conf = new Configuration();
-        try (ParquetFileReader reader =
-                ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
-            return reader.getFooter().getBlocks();
+        if (hadoopFileSystemProxy == null) {
+            Configuration conf = new Configuration();
+            try (ParquetFileReader reader =
+                    ParquetFileReader.open(HadoopInputFile.fromPath(path, 
conf))) {
+                return reader.getFooter().getBlocks();
+            }
+        }
+        try {
+            return hadoopFileSystemProxy.doWithHadoopAuth(
+                    (configuration, userGroupInformation) -> {
+                        try (ParquetFileReader reader =
+                                ParquetFileReader.open(
+                                        HadoopInputFile.fromPath(path, 
configuration))) {
+                            return reader.getFooter().getBlocks();
+                        }
+                    });
+        } catch (Exception e) {
+            if (e instanceof IOException) {
+                throw (IOException) e;
+            }
+            if (e instanceof RuntimeException) {
+                throw (RuntimeException) e;
+            }
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (hadoopFileSystemProxy == null) {
+            return;
         }
+        hadoopFileSystemProxy.close();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
index 0b4e941364..c330d8ba36 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
@@ -36,6 +36,7 @@ import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
 import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -45,8 +46,11 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
@@ -57,6 +61,37 @@ import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME
 
 public class AbstractReadStrategyTest {
 
+    @Test
+    void testSafeSliceUsesSeekForSeekableStream() throws Exception {
+        byte[] data = "0123456789".getBytes(StandardCharsets.UTF_8);
+        TrackingSeekableInputStream in = new TrackingSeekableInputStream(data);
+
+        try (InputStream sliced = AbstractReadStrategy.safeSlice(in, 5, 3)) {
+            byte[] buffer = new byte[10];
+            int n = sliced.read(buffer);
+            Assertions.assertEquals(3, n);
+            Assertions.assertEquals("567", new String(buffer, 0, n, 
StandardCharsets.UTF_8));
+            Assertions.assertTrue(in.seekCalled);
+        }
+    }
+
+    @Test
+    void testSafeSliceReadsToEndWhenLengthIsNegative() throws Exception {
+        byte[] data = "0123456789".getBytes(StandardCharsets.UTF_8);
+        TrackingSeekableInputStream in = new TrackingSeekableInputStream(data);
+
+        try (InputStream sliced = AbstractReadStrategy.safeSlice(in, 5, -1)) {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            byte[] buffer = new byte[4];
+            int n;
+            while ((n = sliced.read(buffer)) != -1) {
+                out.write(buffer, 0, n);
+            }
+            Assertions.assertEquals("56789", new String(out.toByteArray(), 
StandardCharsets.UTF_8));
+            Assertions.assertTrue(in.seekCalled);
+        }
+    }
+
     @DisabledOnOs(OS.WINDOWS)
     @Test
     public void testReadDirectorySkipHiddenDirectories() throws Exception {
@@ -146,6 +181,52 @@ public class AbstractReadStrategyTest {
         }
     }
 
+    private static class TrackingSeekableInputStream extends InputStream 
implements Seekable {
+        private final byte[] data;
+        private int pos;
+        private boolean seekCalled;
+
+        private TrackingSeekableInputStream(byte[] data) {
+            this.data = data;
+            this.pos = 0;
+        }
+
+        @Override
+        public int read() {
+            if (pos >= data.length) {
+                return -1;
+            }
+            return data[pos++] & 0xFF;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) {
+            if (pos >= data.length) {
+                return -1;
+            }
+            int toRead = Math.min(len, data.length - pos);
+            System.arraycopy(data, pos, b, off, toRead);
+            pos += toRead;
+            return toRead;
+        }
+
+        @Override
+        public void seek(long newPos) {
+            this.seekCalled = true;
+            this.pos = (int) newPos;
+        }
+
+        @Override
+        public long getPos() {
+            return pos;
+        }
+
+        @Override
+        public boolean seekToNewSource(long targetPos) {
+            return false;
+        }
+    }
+
     @Test
     void testBothStartAndEndWithinRange() throws Exception {
         try (CsvReadStrategy strategy = new CsvReadStrategy()) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitCompatibilityTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitCompatibilityTest.java
new file mode 100644
index 0000000000..9bbca0c1b9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitCompatibilityTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.tools.JavaCompiler;
+import javax.tools.ToolProvider;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class FileSourceSplitCompatibilityTest {
+
+    private static final String LEGACY_SPLIT_CLASS_NAME =
+            
"org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit";
+
+    @TempDir private Path tempDir;
+
+    @Test
+    void testDeserializeLegacyTwoArgSplitDefaultsToWholeFile() throws 
Exception {
+        byte[] legacyBytes = serializeLegacySplit(tempDir, "t", 
"file:///tmp/test.txt");
+        FileSourceSplit split = deserialize(legacyBytes);
+
+        Assertions.assertEquals("t", split.getTableId());
+        Assertions.assertEquals("file:///tmp/test.txt", split.getFilePath());
+        Assertions.assertEquals(0L, split.getStart());
+        Assertions.assertEquals(-1L, split.getLength());
+        Assertions.assertEquals("t_file:///tmp/test.txt", split.splitId());
+    }
+
+    @Test
+    void testDeserializeLegacySingleArgSplitDefaultsToWholeFile() throws 
Exception {
+        byte[] legacyBytes = serializeLegacySplit(tempDir, 
"file:///tmp/test.txt");
+        FileSourceSplit split = deserialize(legacyBytes);
+
+        Assertions.assertNull(split.getTableId());
+        Assertions.assertEquals("file:///tmp/test.txt", split.getFilePath());
+        Assertions.assertEquals(0L, split.getStart());
+        Assertions.assertEquals(-1L, split.getLength());
+        Assertions.assertEquals("file:///tmp/test.txt", split.splitId());
+    }
+
+    private static FileSourceSplit deserialize(byte[] bytes) throws Exception {
+        try (ObjectInputStream in = new ObjectInputStream(new 
ByteArrayInputStream(bytes))) {
+            Object obj = in.readObject();
+            Assertions.assertTrue(obj instanceof FileSourceSplit);
+            return (FileSourceSplit) obj;
+        }
+    }
+
+    private static byte[] serializeLegacySplit(Path tempDir, String tableId, 
String filePath)
+            throws Exception {
+        Class<?> legacyClass = compileAndLoadLegacyClass(tempDir);
+        Constructor<?> ctor = legacyClass.getConstructor(String.class, 
String.class);
+        Object legacySplit = ctor.newInstance(tableId, filePath);
+        return serialize(legacySplit);
+    }
+
+    private static byte[] serializeLegacySplit(Path tempDir, String splitId) 
throws Exception {
+        Class<?> legacyClass = compileAndLoadLegacyClass(tempDir);
+        Constructor<?> ctor = legacyClass.getConstructor(String.class);
+        Object legacySplit = ctor.newInstance(splitId);
+        return serialize(legacySplit);
+    }
+
+    private static byte[] serialize(Object legacySplit) throws Exception {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        try (ObjectOutputStream oos = new ObjectOutputStream(out)) {
+            oos.writeObject(legacySplit);
+        }
+        return out.toByteArray();
+    }
+
+    private static Class<?> compileAndLoadLegacyClass(Path tempDir) throws 
Exception {
+        JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+        Assumptions.assumeTrue(
+                compiler != null, "JDK compiler is required for legacy 
compatibility test");
+
+        Path sourceRoot = tempDir.resolve("legacy-src");
+        Path outputRoot = tempDir.resolve("legacy-out");
+        Path sourceFile =
+                sourceRoot.resolve(
+                        
"org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java");
+        Files.createDirectories(sourceFile.getParent());
+        Files.createDirectories(outputRoot);
+
+        Files.write(sourceFile, 
legacySourceCode().getBytes(StandardCharsets.UTF_8));
+
+        String classpath = System.getProperty("java.class.path");
+        int result =
+                compiler.run(
+                        null,
+                        null,
+                        null,
+                        "-classpath",
+                        classpath,
+                        "-d",
+                        outputRoot.toString(),
+                        sourceFile.toString());
+        Assertions.assertEquals(0, result, "Failed to compile legacy 
FileSourceSplit");
+
+        URL[] urls = new URL[] {outputRoot.toUri().toURL()};
+        try (ChildFirstClassLoader loader =
+                new ChildFirstClassLoader(
+                        urls, 
FileSourceSplitCompatibilityTest.class.getClassLoader())) {
+            return Class.forName(LEGACY_SPLIT_CLASS_NAME, true, loader);
+        }
+    }
+
+    private static String legacySourceCode() {
+        return "package 
org.apache.seatunnel.connectors.seatunnel.file.source.split;\n"
+                + "\n"
+                + "import org.apache.seatunnel.api.source.SourceSplit;\n"
+                + "\n"
+                + "import java.util.Objects;\n"
+                + "\n"
+                + "public class FileSourceSplit implements SourceSplit {\n"
+                + "    private static final long serialVersionUID = 1L;\n"
+                + "\n"
+                + "    private final String tableId;\n"
+                + "    private final String filePath;\n"
+                + "\n"
+                + "    public FileSourceSplit(String splitId) {\n"
+                + "        this.filePath = splitId;\n"
+                + "        this.tableId = null;\n"
+                + "    }\n"
+                + "\n"
+                + "    public FileSourceSplit(String tableId, String filePath) 
{\n"
+                + "        this.tableId = tableId;\n"
+                + "        this.filePath = filePath;\n"
+                + "    }\n"
+                + "\n"
+                + "    @Override\n"
+                + "    public String splitId() {\n"
+                + "        if (tableId == null) {\n"
+                + "            return filePath;\n"
+                + "        }\n"
+                + "        return tableId + \"_\" + filePath;\n"
+                + "    }\n"
+                + "\n"
+                + "    @Override\n"
+                + "    public boolean equals(Object o) {\n"
+                + "        if (this == o) {\n"
+                + "            return true;\n"
+                + "        }\n"
+                + "        if (o == null || getClass() != o.getClass()) {\n"
+                + "            return false;\n"
+                + "        }\n"
+                + "        FileSourceSplit that = (FileSourceSplit) o;\n"
+                + "        return Objects.equals(tableId, that.tableId)\n"
+                + "                && Objects.equals(filePath, 
that.filePath);\n"
+                + "    }\n"
+                + "\n"
+                + "    @Override\n"
+                + "    public int hashCode() {\n"
+                + "        return Objects.hash(tableId, filePath);\n"
+                + "    }\n"
+                + "}\n";
+    }
+
+    private static final class ChildFirstClassLoader extends URLClassLoader {
+        private ChildFirstClassLoader(URL[] urls, ClassLoader parent) {
+            super(urls, parent);
+        }
+
+        @Override
+        protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+            synchronized (getClassLoadingLock(name)) {
+                if (LEGACY_SPLIT_CLASS_NAME.equals(name)) {
+                    Class<?> loaded = findLoadedClass(name);
+                    if (loaded == null) {
+                        loaded = findClass(name);
+                    }
+                    if (resolve) {
+                        resolveClass(loaded);
+                    }
+                    return loaded;
+                }
+                return super.loadClass(name, resolve);
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
index 1d9c01e62f..413c1ec3ef 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
@@ -25,7 +25,11 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFi
 public class HdfsFileSource extends BaseMultipleTableFileSource {
 
     public HdfsFileSource(ReadonlyConfig readonlyConfig) {
-        super(new MultipleTableHdfsFileSourceConfig(readonlyConfig));
+        this(new MultipleTableHdfsFileSourceConfig(readonlyConfig));
+    }
+
+    private HdfsFileSource(MultipleTableHdfsFileSourceConfig sourceConfig) {
+        super(sourceConfig, initFileSplitStrategy(sourceConfig));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index 6f8211873a..e2cbd802de 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -85,6 +85,18 @@ public class HdfsFileSourceFactory implements 
TableSourceFactory {
                         Arrays.asList(
                                 FileFormat.TEXT, FileFormat.JSON, 
FileFormat.CSV, FileFormat.XML),
                         FileBaseSourceOptions.ENCODING)
+                .conditional(
+                        FileBaseSourceOptions.FILE_FORMAT_TYPE,
+                        Arrays.asList(
+                                FileFormat.TEXT,
+                                FileFormat.JSON,
+                                FileFormat.CSV,
+                                FileFormat.PARQUET),
+                        FileBaseSourceOptions.ENABLE_FILE_SPLIT)
+                .conditional(
+                        FileBaseSourceOptions.ENABLE_FILE_SPLIT,
+                        Boolean.TRUE,
+                        FileBaseSourceOptions.FILE_SPLIT_SIZE)
                 .optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
                 .optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
                 .optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/split/HdfsFileAccordingToSplitSizeSplitStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/split/HdfsFileAccordingToSplitSizeSplitStrategyTest.java
new file mode 100644
index 0000000000..30abb54666
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/split/HdfsFileAccordingToSplitSizeSplitStrategyTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.split;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.config.HdfsFileHadoopConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.AccordingToSplitSizeSplitStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class HdfsFileAccordingToSplitSizeSplitStrategyTest {
+
+    @TempDir private Path tempDir;
+
+    @Test
+    void testSplitNonExistingFileShouldThrowFileNotFound() throws Exception {
+        String fileUri = tempDir.resolve("not_exist.txt").toUri().toString();
+        try (AccordingToSplitSizeSplitStrategy strategy =
+                new AccordingToSplitSizeSplitStrategy(
+                        new HdfsFileHadoopConfig("file:///"), "\n", 0, 
"UTF-8", 6)) {
+            SeaTunnelRuntimeException ex =
+                    Assertions.assertThrows(
+                            SeaTunnelRuntimeException.class, () -> 
strategy.split("t", fileUri));
+            Assertions.assertEquals(
+                    FileConnectorErrorCode.FILE_NOT_FOUND, 
ex.getSeaTunnelErrorCode());
+        }
+    }
+
+    @Test
+    void testSplitByDelimiterSeek() throws IOException {
+        Path filePath = tempDir.resolve("test.txt");
+        Files.write(filePath, 
"abc\nabc\nabc\nabc\nabc\n".getBytes(StandardCharsets.UTF_8));
+
+        String fileUri = filePath.toUri().toString();
+        try (AccordingToSplitSizeSplitStrategy strategy =
+                new AccordingToSplitSizeSplitStrategy(
+                        new HdfsFileHadoopConfig("file:///"), "\n", 0, 
"UTF-8", 6)) {
+            List<FileSourceSplit> splits = strategy.split("t", fileUri);
+            Assertions.assertEquals(3, splits.size());
+
+            Assertions.assertEquals(0, splits.get(0).getStart());
+            Assertions.assertEquals(8, splits.get(0).getLength());
+
+            Assertions.assertEquals(8, splits.get(1).getStart());
+            Assertions.assertEquals(8, splits.get(1).getLength());
+
+            Assertions.assertEquals(16, splits.get(2).getStart());
+            Assertions.assertEquals(4, splits.get(2).getLength());
+        }
+    }
+
+    @Test
+    void testSplitWithSkipHeaderLine() throws IOException {
+        Path filePath = tempDir.resolve("with_header.txt");
+        Files.write(filePath, 
"header\nabc\nabc\nabc\nabc\n".getBytes(StandardCharsets.UTF_8));
+
+        String fileUri = filePath.toUri().toString();
+        try (AccordingToSplitSizeSplitStrategy strategy =
+                new AccordingToSplitSizeSplitStrategy(
+                        new HdfsFileHadoopConfig("file:///"), "\n", 1, 
"UTF-8", 6)) {
+            List<FileSourceSplit> splits = strategy.split("t", fileUri);
+            Assertions.assertEquals(2, splits.size());
+
+            Assertions.assertEquals(7, splits.get(0).getStart());
+            Assertions.assertEquals(8, splits.get(0).getLength());
+
+            Assertions.assertEquals(15, splits.get(1).getStart());
+            Assertions.assertEquals(8, splits.get(1).getLength());
+        }
+    }
+
+    @Test
+    void testSplitWithCrLfDelimiter() throws IOException {
+        Path filePath = tempDir.resolve("crlf.txt");
+        Files.write(filePath, 
"a\r\nb\r\nc\r\n".getBytes(StandardCharsets.UTF_8));
+
+        String fileUri = filePath.toUri().toString();
+        try (AccordingToSplitSizeSplitStrategy strategy =
+                new AccordingToSplitSizeSplitStrategy(
+                        new HdfsFileHadoopConfig("file:///"), "\r\n", 0, 
"UTF-8", 2)) {
+            List<FileSourceSplit> splits = strategy.split("t", fileUri);
+            Assertions.assertEquals(3, splits.size());
+
+            Assertions.assertEquals(0, splits.get(0).getStart());
+            Assertions.assertEquals(3, splits.get(0).getLength());
+
+            Assertions.assertEquals(3, splits.get(1).getStart());
+            Assertions.assertEquals(3, splits.get(1).getLength());
+
+            Assertions.assertEquals(6, splits.get(2).getStart());
+            Assertions.assertEquals(3, splits.get(2).getLength());
+        }
+    }
+
+    @Test
+    void testReadBySplitsShouldMatchFullRead() throws Exception {
+        Path filePath = tempDir.resolve("read_compare.txt");
+        List<String> lines = new ArrayList<>();
+        lines.add("header");
+        for (int i = 1; i <= 200; i++) {
+            lines.add("value-" + i);
+        }
+        Files.write(filePath, (String.join("\n", lines) + 
"\n").getBytes(StandardCharsets.UTF_8));
+
+        String fileUri = filePath.toUri().toString();
+        HdfsFileHadoopConfig hadoopConf = new HdfsFileHadoopConfig("file:///");
+        String tableId = "t";
+
+        List<String> fullReadResult =
+                readByTextStrategy(
+                        hadoopConf,
+                        fileUri,
+                        tableId,
+                        Collections.singletonList(new FileSourceSplit(tableId, 
fileUri)),
+                        false,
+                        "\n",
+                        1);
+        Assertions.assertEquals(200, fullReadResult.size());
+        Assertions.assertEquals("value-1", fullReadResult.get(0));
+
+        List<FileSourceSplit> splits;
+        try (AccordingToSplitSizeSplitStrategy splitStrategy =
+                new AccordingToSplitSizeSplitStrategy(hadoopConf, "\n", 1, 
"UTF-8", 64)) {
+            splits = splitStrategy.split(tableId, fileUri);
+        }
+        Assertions.assertTrue(splits.size() > 1);
+
+        List<String> splitReadResult =
+                readByTextStrategy(hadoopConf, fileUri, tableId, splits, true, 
"\n", 1);
+        Assertions.assertEquals(fullReadResult, splitReadResult);
+    }
+
+    private static List<String> readByTextStrategy(
+            HdfsFileHadoopConfig hadoopConf,
+            String fileUri,
+            String tableId,
+            List<FileSourceSplit> splits,
+            boolean enableFileSplit,
+            String rowDelimiter,
+            long skipHeaderRows)
+            throws Exception {
+        Config pluginConfig =
+                ConfigFactory.empty()
+                        .withValue(
+                                FileBaseSourceOptions.FILE_PATH.key(),
+                                ConfigValueFactory.fromAnyRef(fileUri))
+                        .withValue(
+                                FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(),
+                                ConfigValueFactory.fromAnyRef(enableFileSplit))
+                        .withValue(
+                                FileBaseSourceOptions.ROW_DELIMITER.key(),
+                                ConfigValueFactory.fromAnyRef(rowDelimiter))
+                        .withValue(
+                                
FileBaseSourceOptions.SKIP_HEADER_ROW_NUMBER.key(),
+                                ConfigValueFactory.fromAnyRef(skipHeaderRows));
+
+        List<String> results = new ArrayList<>();
+        try (TextReadStrategy readStrategy = new TextReadStrategy()) {
+            readStrategy.setPluginConfig(pluginConfig);
+            readStrategy.init(hadoopConf);
+            readStrategy.getFileNamesByPath(fileUri);
+            
readStrategy.setCatalogTable(CatalogTableUtil.buildSimpleTextTable());
+
+            FirstFieldCollector collector = new FirstFieldCollector(tableId, 
results);
+            for (FileSourceSplit split : splits) {
+                readStrategy.read(split, collector);
+            }
+        }
+        return results;
+    }
+
+    private static class FirstFieldCollector implements 
Collector<SeaTunnelRow> {
+        private final Object lock = new Object();
+        private final String tableId;
+        private final List<String> rows;
+
+        private FirstFieldCollector(String tableId, List<String> rows) {
+            this.tableId = tableId;
+            this.rows = rows;
+        }
+
+        @Override
+        public void collect(SeaTunnelRow record) {
+            Assertions.assertEquals(tableId, record.getTableId());
+            Object field = record.getField(0);
+            rows.add(field == null ? null : String.valueOf(field));
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return lock;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/split/HdfsFileSplitStrategyFactoryTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/split/HdfsFileSplitStrategyFactoryTest.java
new file mode 100644
index 0000000000..c480698220
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/split/HdfsFileSplitStrategyFactoryTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.split;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.config.HdfsFileHadoopConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.AccordingToSplitSizeSplitStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategyFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.ParquetFileSplitStrategy;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HdfsFileSplitStrategyFactoryTest {
+
+    @Test
+    void testInitFileSplitStrategy() {
+        HdfsFileHadoopConfig hadoopConf = new HdfsFileHadoopConfig("file:///");
+
+        Map<String, Object> map = baseConfig(FileFormat.ORC);
+        map.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        FileSplitStrategy fileSplitStrategy =
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map), hadoopConf);
+        Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
+
+        Map<String, Object> map1 = baseConfig(FileFormat.TEXT);
+        fileSplitStrategy =
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map1), hadoopConf);
+        Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
+
+        Map<String, Object> map2 = baseConfig(FileFormat.TEXT);
+        map2.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        fileSplitStrategy =
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map2), hadoopConf);
+        Assertions.assertInstanceOf(AccordingToSplitSizeSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
+
+        Map<String, Object> map3 = baseConfig(FileFormat.CSV);
+        map3.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        fileSplitStrategy =
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map3), hadoopConf);
+        Assertions.assertInstanceOf(AccordingToSplitSizeSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
+
+        Map<String, Object> map4 = baseConfig(FileFormat.JSON);
+        map4.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        fileSplitStrategy =
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map4), hadoopConf);
+        Assertions.assertInstanceOf(AccordingToSplitSizeSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
+
+        Map<String, Object> map5 = baseConfig(FileFormat.PARQUET);
+        map5.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        fileSplitStrategy =
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map5), hadoopConf);
+        Assertions.assertInstanceOf(ParquetFileSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
+
+        Map<String, Object> map6 = baseConfig(FileFormat.PARQUET);
+        map6.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        map6.put(FileBaseSourceOptions.COMPRESS_CODEC.key(), 
CompressFormat.LZO);
+        map6.put(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC.key(), 
ArchiveCompressFormat.NONE);
+        fileSplitStrategy =
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map6), hadoopConf);
+        Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
+    }
+
+    private Map<String, Object> baseConfig(FileFormat fileFormat) {
+        Map<String, Object> map = new HashMap<>();
+        map.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), fileFormat);
+        map.put(HdfsSourceConfigOptions.DEFAULT_FS.key(), "file:///");
+        return map;
+    }
+
+    private void closeQuietly(FileSplitStrategy strategy) {
+        try {
+            if (strategy instanceof Closeable) {
+                ((Closeable) strategy).close();
+                return;
+            }
+            if (strategy instanceof AutoCloseable) {
+                ((AutoCloseable) strategy).close();
+            }
+        } catch (Exception ignored) {
+            // ignore
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index bf3311efab..fd5e24d6ff 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -20,15 +20,16 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.local.source;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSplitStrategyFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
 
 public class LocalFileSource extends BaseMultipleTableFileSource {
 
     public LocalFileSource(ReadonlyConfig readonlyConfig) {
-        super(
-                new MultipleTableLocalFileSourceConfig(readonlyConfig),
-                
LocalFileSplitStrategyFactory.initFileSplitStrategy(readonlyConfig));
+        this(new MultipleTableLocalFileSourceConfig(readonlyConfig));
+    }
+
+    private LocalFileSource(MultipleTableLocalFileSourceConfig sourceConfig) {
+        super(sourceConfig, initFileSplitStrategy(sourceConfig));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileAccordingToSplitSizeSplitStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileAccordingToSplitSizeSplitStrategy.java
index 3a6cb18b13..10faa8a9c6 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileAccordingToSplitSizeSplitStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileAccordingToSplitSizeSplitStrategy.java
@@ -16,44 +16,25 @@
  */
 package org.apache.seatunnel.connectors.seatunnel.file.local.source.split;
 
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.AccordingToSplitSizeSplitStrategy;
 
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
+/**
+ * Compatibility adapter for historical local-file split strategy.
+ *
+ * @deprecated Use {@link AccordingToSplitSizeSplitStrategy} via {@link
+ *     
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategyFactory}.
+ */
+@Deprecated
 public class LocalFileAccordingToSplitSizeSplitStrategy extends 
AccordingToSplitSizeSplitStrategy {
 
     public LocalFileAccordingToSplitSizeSplitStrategy(
             String rowDelimiter, long skipHeaderRowNumber, String 
encodingName, long splitSize) {
-        super(rowDelimiter, skipHeaderRowNumber, encodingName, splitSize);
-    }
-
-    @Override
-    protected InputStream getInputStream(String filePath) throws IOException {
-        Path path = toLocalNioPath(filePath);
-        return new BufferedInputStream(Files.newInputStream(path));
-    }
-
-    @Override
-    protected long getFileSize(String filePath) throws IOException {
-        Path path = toLocalNioPath(filePath);
-        return Files.size(path);
-    }
-
-    private static Path toLocalNioPath(String filePath) {
-        try {
-            URI uri = URI.create(filePath);
-            if ("file".equalsIgnoreCase(uri.getScheme())) {
-                return Paths.get(uri);
-            }
-        } catch (Exception ignored) {
-            // ignore malformed URI
-        }
-        return Paths.get(filePath);
+        super(
+                new LocalFileHadoopConf(),
+                rowDelimiter,
+                skipHeaderRowNumber,
+                encodingName,
+                splitSize);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileSourceTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileSourceTest.java
index 379f2303a5..a64af3b81a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileSourceTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileSourceTest.java
@@ -21,15 +21,17 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressForm
 import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileAccordingToSplitSizeSplitStrategy;
-import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSplitStrategyFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.AccordingToSplitSizeSplitStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategyFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.ParquetFileSplitStrategy;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.io.Closeable;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -42,45 +44,54 @@ public class LocalFileSourceTest {
         map.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), FileFormat.ORC);
         map.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
         FileSplitStrategy fileSplitStrategy =
-                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map));
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map), new 
LocalFileHadoopConf());
         Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
         // test text, no split
         Map<String, Object> map1 = new HashMap<>();
         map1.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.TEXT);
         fileSplitStrategy =
-                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map1));
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map1), new 
LocalFileHadoopConf());
         Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
         // test text, split
         Map<String, Object> map2 = new HashMap<>();
         map2.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.TEXT);
         map2.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
         fileSplitStrategy =
-                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map2));
-        Assertions.assertInstanceOf(
-                LocalFileAccordingToSplitSizeSplitStrategy.class, 
fileSplitStrategy);
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map2), new 
LocalFileHadoopConf());
+        Assertions.assertInstanceOf(AccordingToSplitSizeSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
         // test csv, split
         Map<String, Object> map3 = new HashMap<>();
         map3.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), FileFormat.CSV);
         map3.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
         fileSplitStrategy =
-                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map3));
-        Assertions.assertInstanceOf(
-                LocalFileAccordingToSplitSizeSplitStrategy.class, 
fileSplitStrategy);
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map3), new 
LocalFileHadoopConf());
+        Assertions.assertInstanceOf(AccordingToSplitSizeSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
         // test json, split
         Map<String, Object> map4 = new HashMap<>();
         map4.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.JSON);
         map4.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
         fileSplitStrategy =
-                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map4));
-        Assertions.assertInstanceOf(
-                LocalFileAccordingToSplitSizeSplitStrategy.class, 
fileSplitStrategy);
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map4), new 
LocalFileHadoopConf());
+        Assertions.assertInstanceOf(AccordingToSplitSizeSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
         // test parquet, split
         Map<String, Object> map5 = new HashMap<>();
         map5.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.PARQUET);
         map5.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
         fileSplitStrategy =
-                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map5));
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map5), new 
LocalFileHadoopConf());
         Assertions.assertInstanceOf(ParquetFileSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
         // test compress 1
         Map<String, Object> map6 = new HashMap<>();
         map6.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.PARQUET);
@@ -88,8 +99,10 @@ public class LocalFileSourceTest {
         map6.put(FileBaseSourceOptions.COMPRESS_CODEC.key(), 
CompressFormat.LZO);
         map6.put(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC.key(), 
ArchiveCompressFormat.NONE);
         fileSplitStrategy =
-                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map6));
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map6), new 
LocalFileHadoopConf());
         Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
         // test compress 2
         Map<String, Object> map7 = new HashMap<>();
         map7.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.PARQUET);
@@ -97,7 +110,23 @@ public class LocalFileSourceTest {
         map7.put(FileBaseSourceOptions.COMPRESS_CODEC.key(), 
CompressFormat.NONE);
         map7.put(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC.key(), 
ArchiveCompressFormat.NONE);
         fileSplitStrategy =
-                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map7));
+                FileSplitStrategyFactory.initFileSplitStrategy(
+                        ReadonlyConfig.fromMap(map7), new 
LocalFileHadoopConf());
         Assertions.assertInstanceOf(ParquetFileSplitStrategy.class, 
fileSplitStrategy);
+        closeQuietly(fileSplitStrategy);
+    }
+
+    private void closeQuietly(FileSplitStrategy strategy) {
+        try {
+            if (strategy instanceof Closeable) {
+                ((Closeable) strategy).close();
+                return;
+            }
+            if (strategy instanceof AutoCloseable) {
+                ((AutoCloseable) strategy).close();
+            }
+        } catch (Exception ignored) {
+            // ignore
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/SplitFileStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/SplitFileStrategyTest.java
index c47aa7214d..e2c1f1a09c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/SplitFileStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/SplitFileStrategyTest.java
@@ -16,7 +16,8 @@
  */
 package org.apache.seatunnel.connectors.seatunnel.file.local;
 
-import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileAccordingToSplitSizeSplitStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.AccordingToSplitSizeSplitStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
 
 import org.junit.jupiter.api.Assertions;
@@ -39,18 +40,21 @@ public class SplitFileStrategyTest {
     @SneakyThrows
     @Test
     public void testSplitNoSkipHeader() {
-        final LocalFileAccordingToSplitSizeSplitStrategy 
localFileSplitStrategy =
-                new LocalFileAccordingToSplitSizeSplitStrategy("\n", 0L, 
"utf-8", 100L);
         URL url = 
getClass().getClassLoader().getResource("test_split_csv_data.csv");
         String realPath = Paths.get(url.toURI()).toString();
-        final List<FileSourceSplit> splits = 
localFileSplitStrategy.split("test.table", realPath);
-        Assertions.assertEquals(2, splits.size());
-        // check split-1
-        Assertions.assertEquals(0, splits.get(0).getStart());
-        Assertions.assertEquals(105, splits.get(0).getLength());
-        // check split-2
-        Assertions.assertEquals(105, splits.get(1).getStart());
-        Assertions.assertEquals(85, splits.get(1).getLength());
+        try (AccordingToSplitSizeSplitStrategy localFileSplitStrategy =
+                new AccordingToSplitSizeSplitStrategy(
+                        new LocalFileHadoopConf(), "\n", 0L, "utf-8", 100L)) {
+            final List<FileSourceSplit> splits =
+                    localFileSplitStrategy.split("test.table", realPath);
+            Assertions.assertEquals(2, splits.size());
+            // check split-1
+            Assertions.assertEquals(0, splits.get(0).getStart());
+            Assertions.assertEquals(105, splits.get(0).getLength());
+            // check split-2
+            Assertions.assertEquals(105, splits.get(1).getStart());
+            Assertions.assertEquals(85, splits.get(1).getLength());
+        }
     }
 
     @DisabledOnOs(
@@ -60,24 +64,27 @@ public class SplitFileStrategyTest {
     @SneakyThrows
     @Test
     public void testSplitSkipHeader() {
-        final LocalFileAccordingToSplitSizeSplitStrategy 
localFileSplitStrategy =
-                new LocalFileAccordingToSplitSizeSplitStrategy("\n", 1L, 
"utf-8", 30L);
         URL url = 
getClass().getClassLoader().getResource("test_split_csv_data.csv");
         String realPath = Paths.get(url.toURI()).toString();
-        final List<FileSourceSplit> splits = 
localFileSplitStrategy.split("test.table", realPath);
-        Assertions.assertEquals(4, splits.size());
-        // check split-1
-        Assertions.assertEquals(21, splits.get(0).getStart());
-        Assertions.assertEquals(41, splits.get(0).getLength());
-        // check split-2
-        Assertions.assertEquals(62, splits.get(1).getStart());
-        Assertions.assertEquals(43, splits.get(1).getLength());
-        // check split-3
-        Assertions.assertEquals(105, splits.get(2).getStart());
-        Assertions.assertEquals(43, splits.get(2).getLength());
-        // check split-4
-        Assertions.assertEquals(148, splits.get(3).getStart());
-        Assertions.assertEquals(42, splits.get(3).getLength());
+        try (AccordingToSplitSizeSplitStrategy localFileSplitStrategy =
+                new AccordingToSplitSizeSplitStrategy(
+                        new LocalFileHadoopConf(), "\n", 1L, "utf-8", 30L)) {
+            final List<FileSourceSplit> splits =
+                    localFileSplitStrategy.split("test.table", realPath);
+            Assertions.assertEquals(4, splits.size());
+            // check split-1
+            Assertions.assertEquals(21, splits.get(0).getStart());
+            Assertions.assertEquals(41, splits.get(0).getLength());
+            // check split-2
+            Assertions.assertEquals(62, splits.get(1).getStart());
+            Assertions.assertEquals(43, splits.get(1).getLength());
+            // check split-3
+            Assertions.assertEquals(105, splits.get(2).getStart());
+            Assertions.assertEquals(43, splits.get(2).getLength());
+            // check split-4
+            Assertions.assertEquals(148, splits.get(3).getStart());
+            Assertions.assertEquals(42, splits.get(3).getLength());
+        }
     }
 
     @DisabledOnOs(
@@ -87,15 +94,18 @@ public class SplitFileStrategyTest {
     @SneakyThrows
     @Test
     public void testSplitSkipHeaderLargeSize() {
-        final LocalFileAccordingToSplitSizeSplitStrategy 
localFileSplitStrategy =
-                new LocalFileAccordingToSplitSizeSplitStrategy("\n", 1L, 
"utf-8", 300L);
         URL url = 
getClass().getClassLoader().getResource("test_split_csv_data.csv");
         String realPath = Paths.get(url.toURI()).toString();
-        final List<FileSourceSplit> splits = 
localFileSplitStrategy.split("test.table", realPath);
-        Assertions.assertEquals(1, splits.size());
-        // check split-1
-        Assertions.assertEquals(21, splits.get(0).getStart());
-        Assertions.assertEquals(169, splits.get(0).getLength());
+        try (AccordingToSplitSizeSplitStrategy localFileSplitStrategy =
+                new AccordingToSplitSizeSplitStrategy(
+                        new LocalFileHadoopConf(), "\n", 1L, "utf-8", 300L)) {
+            final List<FileSourceSplit> splits =
+                    localFileSplitStrategy.split("test.table", realPath);
+            Assertions.assertEquals(1, splits.size());
+            // check split-1
+            Assertions.assertEquals(21, splits.get(0).getStart());
+            Assertions.assertEquals(169, splits.get(0).getLength());
+        }
     }
 
     @DisabledOnOs(
@@ -105,51 +115,60 @@ public class SplitFileStrategyTest {
     @SneakyThrows
     @Test
     public void testSplitSkipHeaderSmallSize() {
-        final LocalFileAccordingToSplitSizeSplitStrategy 
localFileSplitStrategy =
-                new LocalFileAccordingToSplitSizeSplitStrategy("\n", 1L, 
"utf-8", 3L);
         URL url = 
getClass().getClassLoader().getResource("test_split_csv_data.csv");
         String realPath = Paths.get(url.toURI()).toString();
-        final List<FileSourceSplit> splits = 
localFileSplitStrategy.split("test.table", realPath);
-        Assertions.assertEquals(8, splits.size());
-        // check split
-        Assertions.assertEquals(21, splits.get(0).getStart());
-        Assertions.assertEquals(42, splits.get(1).getStart());
-        Assertions.assertEquals(62, splits.get(2).getStart());
-        Assertions.assertEquals(82, splits.get(3).getStart());
-        Assertions.assertEquals(105, splits.get(4).getStart());
-        Assertions.assertEquals(126, splits.get(5).getStart());
-        Assertions.assertEquals(148, splits.get(6).getStart());
-        Assertions.assertEquals(169, splits.get(7).getStart());
+        try (AccordingToSplitSizeSplitStrategy localFileSplitStrategy =
+                new AccordingToSplitSizeSplitStrategy(
+                        new LocalFileHadoopConf(), "\n", 1L, "utf-8", 3L)) {
+            final List<FileSourceSplit> splits =
+                    localFileSplitStrategy.split("test.table", realPath);
+            Assertions.assertEquals(8, splits.size());
+            // check split
+            Assertions.assertEquals(21, splits.get(0).getStart());
+            Assertions.assertEquals(42, splits.get(1).getStart());
+            Assertions.assertEquals(62, splits.get(2).getStart());
+            Assertions.assertEquals(82, splits.get(3).getStart());
+            Assertions.assertEquals(105, splits.get(4).getStart());
+            Assertions.assertEquals(126, splits.get(5).getStart());
+            Assertions.assertEquals(148, splits.get(6).getStart());
+            Assertions.assertEquals(169, splits.get(7).getStart());
+        }
     }
 
     @SneakyThrows
     @Test
     public void testSplitSkipHeaderSpecialRowDelimiter() {
-        final LocalFileAccordingToSplitSizeSplitStrategy 
localFileSplitStrategy =
-                new LocalFileAccordingToSplitSizeSplitStrategy("|^|", 1L, 
"utf-8", 80L);
         URL url =
                 getClass()
                         .getClassLoader()
                         
.getResource("test_split_special_row_delimiter_data.txt");
         String realPath = Paths.get(url.toURI()).toString();
-        final List<FileSourceSplit> splits = 
localFileSplitStrategy.split("test.table", realPath);
-        Assertions.assertEquals(2, splits.size());
-        // check split-1
-        Assertions.assertEquals(23, splits.get(0).getStart());
-        Assertions.assertEquals(92, splits.get(0).getLength());
-        // check split-2
-        Assertions.assertEquals(115, splits.get(1).getStart());
-        Assertions.assertEquals(91, splits.get(1).getLength());
+        try (AccordingToSplitSizeSplitStrategy localFileSplitStrategy =
+                new AccordingToSplitSizeSplitStrategy(
+                        new LocalFileHadoopConf(), "|^|", 1L, "utf-8", 80L)) {
+            final List<FileSourceSplit> splits =
+                    localFileSplitStrategy.split("test.table", realPath);
+            Assertions.assertEquals(2, splits.size());
+            // check split-1
+            Assertions.assertEquals(23, splits.get(0).getStart());
+            Assertions.assertEquals(92, splits.get(0).getLength());
+            // check split-2
+            Assertions.assertEquals(115, splits.get(1).getStart());
+            Assertions.assertEquals(91, splits.get(1).getLength());
+        }
     }
 
     @SneakyThrows
     @Test
     public void testSplitEmpty() {
-        final LocalFileAccordingToSplitSizeSplitStrategy 
localFileSplitStrategy =
-                new LocalFileAccordingToSplitSizeSplitStrategy("\n", 1L, 
"utf-8", 300L);
         URL url = 
getClass().getClassLoader().getResource("test_split_empty_data.csv");
         String realPath = Paths.get(url.toURI()).toString();
-        final List<FileSourceSplit> splits = 
localFileSplitStrategy.split("test.table", realPath);
-        Assertions.assertEquals(0, splits.size());
+        try (AccordingToSplitSizeSplitStrategy localFileSplitStrategy =
+                new AccordingToSplitSizeSplitStrategy(
+                        new LocalFileHadoopConf(), "\n", 1L, "utf-8", 300L)) {
+            final List<FileSourceSplit> splits =
+                    localFileSplitStrategy.split("test.table", realPath);
+            Assertions.assertEquals(0, splits.size());
+        }
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
index 015ec79690..40bbdb9d8e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
@@ -129,6 +129,28 @@ public class HdfsFileIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, readResult.getExitCode());
     }
 
+    @TestTemplate
+    public void testHdfsParquetReadWithFileSplit(TestContainer container)
+            throws IOException, InterruptedException {
+        org.testcontainers.containers.Container.ExecResult writeResult =
+                container.executeJob("/fake_to_hdfs_normal.conf");
+        Assertions.assertEquals(0, writeResult.getExitCode());
+        org.testcontainers.containers.Container.ExecResult readResult =
+                container.executeJob("/hdfs_parquet_split_to_assert.conf");
+        Assertions.assertEquals(0, readResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testHdfsTextReadWithFileSplit(TestContainer container)
+            throws IOException, InterruptedException {
+        resetSplitTestPath();
+        putHdfsSequentialLinesFile("/split/input/test.txt", 1000);
+
+        org.testcontainers.containers.Container.ExecResult readResult =
+                container.executeJob("/hdfs_text_split_to_assert.conf");
+        Assertions.assertEquals(0, readResult.getExitCode());
+    }
+
     @TestTemplate
     public void testHdfsReadEmptyTextDirectory(TestContainer container)
             throws IOException, InterruptedException {
@@ -195,6 +217,13 @@ public class HdfsFileIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, mkdirResult.getExitCode());
     }
 
+    private void resetSplitTestPath() throws IOException, InterruptedException 
{
+        nameNode.execInContainer("bash", "-c", "hdfs dfs -rm -r -f /split || 
true");
+        org.testcontainers.containers.Container.ExecResult mkdirResult =
+                nameNode.execInContainer("hdfs", "dfs", "-mkdir", "-p", 
"/split/input");
+        Assertions.assertEquals(0, mkdirResult.getExitCode());
+    }
+
     private void putHdfsFile(String hdfsPath, String content)
             throws IOException, InterruptedException {
         String command = "printf '" + content + "' | hdfs dfs -put -f - " + 
hdfsPath;
@@ -203,6 +232,18 @@ public class HdfsFileIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, putResult.getExitCode());
     }
 
+    private void putHdfsSequentialLinesFile(String hdfsPath, int lineCount)
+            throws IOException, InterruptedException {
+        String command =
+                "i=1; while [ $i -le "
+                        + lineCount
+                        + " ]; do echo $i; i=$((i+1)); done | hdfs dfs -put -f 
- "
+                        + hdfsPath;
+        org.testcontainers.containers.Container.ExecResult putResult =
+                nameNode.execInContainer("bash", "-c", command);
+        Assertions.assertEquals(0, putResult.getExitCode());
+    }
+
     private String readHdfsFile(String hdfsPath) throws IOException, 
InterruptedException {
         org.testcontainers.containers.Container.ExecResult catResult =
                 nameNode.execInContainer("hdfs", "dfs", "-cat", hdfsPath);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_parquet_split_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_parquet_split_to_assert.conf
new file mode 100644
index 0000000000..f5ac87c023
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_parquet_split_to_assert.conf
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  HdfsFile {
+    fs.defaultFS = "hdfs://namenode1:9000"
+    path = "/normal/output"
+    file_format_type = "parquet"
+    enable_file_split = true
+    file_split_size = 1024
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_float = float
+        c_double = double
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+      }
+    }
+    hadoop_conf = {
+      "dfs.replication" = 1
+    }
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 100
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 100
+        }
+      ]
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_text_split_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_text_split_to_assert.conf
new file mode 100644
index 0000000000..91e2c52552
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_text_split_to_assert.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # NOTE: Spark runs this E2E with `--master local` (single thread). The 
Assert sink checks row
+  # rules per task commit, so using parallelism > 1 may validate before all 
partitions finish.
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  HdfsFile {
+    fs.defaultFS = "hdfs://namenode1:9000"
+    path = "/split/input/test.txt"
+    file_format_type = "text"
+    enable_file_split = true
+    file_split_size = 20
+    schema = {
+      fields {
+        line = string
+      }
+    }
+    hadoop_conf = {
+      "dfs.replication" = 1
+    }
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 1000
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 1000
+        }
+      ]
+    }
+  }
+}

Reply via email to