This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3f2ea78a46 [Feature][connectors-v2/connector-file]Support logical 
Parquet splits (#10239)
3f2ea78a46 is described below

commit 3f2ea78a46865c8d5b39cb4170e80e8062c50367
Author: 老王 <[email protected]>
AuthorDate: Fri Jan 9 20:56:11 2026 +0800

    [Feature][connectors-v2/connector-file]Support logical Parquet splits 
(#10239)
---
 docs/en/connector-v2/source/LocalFile.md           |   2 +-
 docs/zh/connector-v2/source/LocalFile.md           |   2 +-
 .../connector-file/connector-file-base/pom.xml     |   7 ++
 .../seatunnel/file/config/FileFormat.java          |  16 +++
 .../file/exception/FileConnectorErrorCode.java     |   4 +-
 .../file/source/reader/ParquetReadStrategy.java    |  22 +++-
 .../file/source/split/FileSplitStrategy.java       |  12 +++
 .../source/split/ParquetFileSplitStrategy.java     | 118 +++++++++++++++++++++
 .../source/split/ParquetFileSplitStrategyTest.java |  94 ++++++++++++++++
 .../file/local/source/LocalFileSource.java         |  27 +----
 .../file/local/source/LocalFileSourceFactory.java  |   6 +-
 .../LocalFileSplitStrategyFactory.java}            |  43 ++++----
 .../seatunnel/file/local/LocalFileSourceTest.java  | 103 ++++++++++++++++++
 .../e2e/connector/file/local/LocalFileIT.java      |   1 +
 .../local_file_parquet_enable_split_to_assert.conf |  98 +++++++++++++++++
 15 files changed, 501 insertions(+), 54 deletions(-)

diff --git a/docs/en/connector-v2/source/LocalFile.md 
b/docs/en/connector-v2/source/LocalFile.md
index e6de4f3033..bc2edd88f8 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -420,7 +420,7 @@ File modification time filter. The connector will filter 
some files base on the
 
 ### enable_file_split [string]
 
-Turn on the file splitting function, the default is false。It can be selected 
when the file type is csv, text, json and non-compressed format.
+Turn on the file splitting function, the default is false.It can be selected 
when the file type is csv, text, json, parquet and non-compressed format.
 
 ### file_split_size [long]
 
diff --git a/docs/zh/connector-v2/source/LocalFile.md 
b/docs/zh/connector-v2/source/LocalFile.md
index 0ff85d51e7..215669fc7b 100644
--- a/docs/zh/connector-v2/source/LocalFile.md
+++ b/docs/zh/connector-v2/source/LocalFile.md
@@ -421,7 +421,7 @@ null_format 定义哪些字符串可以表示为 null。
 
 ### enable_file_split [boolean]
 
-开启文件分割功能,默认为false。文件类型为csv、text、json、非压缩格式时可选择。
+开启文件分割功能,默认为false。文件类型为csv、text、json、parquet非压缩格式时可选择。
 
 ### file_split_size [long]
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml 
b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index ca03d0963e..8742d0a47c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -185,6 +185,13 @@
             <artifactId>flexmark-all</artifactId>
             <version>${flexmark-all.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index 57b9e01bdd..1253b82725 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -41,9 +41,12 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.XmlReadStrategy;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.io.Serializable;
 import java.util.Arrays;
 
+@Slf4j
 public enum FileFormat implements Serializable {
     CSV("csv") {
         @Override
@@ -207,4 +210,17 @@ public enum FileFormat implements Serializable {
     public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
         return null;
     }
+
+    public boolean supportFileSplit() {
+        switch (this) {
+            case CSV:
+            case TEXT:
+            case JSON:
+            case PARQUET:
+                return true;
+            default:
+                log.info("The {} file type does not support file split", this);
+                return false;
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
index f5847aff14..db30dfcb8f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
@@ -28,7 +28,9 @@ public enum FileConnectorErrorCode implements 
SeaTunnelErrorCode {
     FILE_READ_STRATEGY_NOT_SUPPORT("FILE-06", "File strategy not support"),
     FORMAT_NOT_SUPPORT("FILE-07", "Format not support"),
     FILE_READ_FAILED("FILE-08", "File read failed"),
-    BINARY_FILE_PART_ORDER_ERROR("FILE-09", "Binary file fragment order 
abnormality");
+    BINARY_FILE_PART_ORDER_ERROR("FILE-09", "Binary file fragment order 
abnormality"),
+    FILE_SPLIT_SIZE_ILLEGAL("FILE-10", "SplitSizeBytes must be greater than 
0"),
+    FILE_SPLIT_FAIL("FILE-11", "File split fail");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 2c2e134f4c..fe1cab452a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
 
 import org.apache.avro.Conversions;
 import org.apache.avro.data.TimeConversions;
@@ -89,6 +90,14 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
     @Override
     public void read(String path, String tableId, Collector<SeaTunnelRow> 
output)
             throws FileConnectorException, IOException {
+        this.read(new FileSourceSplit(path), output);
+    }
+
+    @Override
+    public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
+            throws IOException, FileConnectorException {
+        String tableId = split.getTableId();
+        String path = split.getFilePath();
         if (Boolean.FALSE.equals(checkFileType(path))) {
             String errorMsg =
                     String.format(
@@ -107,11 +116,18 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
         dataModel.addLogicalTypeConversion(new 
Conversions.DecimalConversion());
         dataModel.addLogicalTypeConversion(new 
TimeConversions.DateConversion());
         dataModel.addLogicalTypeConversion(new 
TimeConversions.LocalTimestampMillisConversion());
+        final boolean useSplitRange =
+                enableSplitFile && split.getStart() >= 0 && split.getLength() 
> 0;
         GenericRecord record;
-        try (ParquetReader<GenericData.Record> reader =
+        AvroParquetReader.Builder<GenericData.Record> builder =
                 AvroParquetReader.<GenericData.Record>builder(hadoopInputFile)
-                        .withDataModel(dataModel)
-                        .build()) {
+                        .withDataModel(dataModel);
+        if (useSplitRange) {
+            long start = split.getStart();
+            long end = start + split.getLength();
+            builder.withFileRange(start, end);
+        }
+        try (ParquetReader<GenericData.Record> reader = builder.build()) {
             while ((record = reader.read()) != null) {
                 Object[] fields;
                 if (isMergePartition) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategy.java
index 12f7a4746f..bb5bab7e37 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategy.java
@@ -19,6 +19,18 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.source.split;
 import java.io.Serializable;
 import java.util.List;
 
+/**
+ * {@link FileSplitStrategy} defines the contract for splitting a file into 
one or more {@link
+ * FileSourceSplit}s that can be processed in parallel by file-based sources.
+ *
+ * <p>The split strategy determines how a file is logically divided, such as 
by byte ranges, record
+ * boundaries, or format-specific physical units. Implementations are 
responsible for ensuring that
+ * each generated split is readable and does not violate the semantics of the 
underlying file
+ * format.
+ *
+ * <p>The resulting {@link FileSourceSplit}s describe the portion of the file 
to be read, while the
+ * actual data parsing and decoding are handled by the corresponding reader 
implementation.
+ */
 public interface FileSplitStrategy extends Serializable {
 
     List<FileSourceSplit> split(String tableId, String filePath);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
new file mode 100644
index 0000000000..48b7adc102
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * {@link ParquetFileSplitStrategy} defines a split strategy for Parquet files 
based on Parquet
+ * physical storage units (RowGroups).
+ *
+ * <p>This strategy uses {@code RowGroup} as the minimum indivisible split 
unit and generates {@link
+ * FileSourceSplit}s by merging one or more contiguous RowGroups according to 
the configured split
+ * size. A split will never break a RowGroup, ensuring correctness and 
compatibility with Parquet
+ * readers.
+ *
+ * <p>The generated split range ({@code start}, {@code length}) represents a 
byte range covering
+ * complete RowGroups. The actual row-level reading and decoding are delegated 
to the Parquet reader
+ * implementation.
+ *
+ * <p>This design enables efficient parallel reading of Parquet files while 
preserving Parquet
+ * format semantics and avoiding invalid byte-level splits.
+ */
+public class ParquetFileSplitStrategy implements FileSplitStrategy {
+
+    private final long splitSizeBytes;
+
+    public ParquetFileSplitStrategy(long splitSizeBytes) {
+        if (splitSizeBytes <= 0) {
+            throw new SeaTunnelRuntimeException(
+                    FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
+                    "SplitSizeBytes must be greater than 0");
+        }
+        this.splitSizeBytes = splitSizeBytes;
+    }
+
+    @Override
+    public List<FileSourceSplit> split(String tableId, String filePath) {
+        try {
+            return splitByRowGroups(tableId, filePath, 
readRowGroups(filePath));
+        } catch (IOException e) {
+            throw new 
SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_SPLIT_FAIL, e);
+        }
+    }
+
+    /**
+     * Core split logic based on row group metadata. This method is IO-free 
and unit-test friendly.
+     */
+    List<FileSourceSplit> splitByRowGroups(
+            String tableId, String filePath, List<BlockMetaData> rowGroups) {
+        List<FileSourceSplit> splits = new ArrayList<>();
+        if (rowGroups == null || rowGroups.isEmpty()) {
+            return splits;
+        }
+        long currentStart = 0;
+        long currentLength = 0;
+        boolean hasOpenSplit = false;
+        for (BlockMetaData block : rowGroups) {
+            long rgStart = block.getStartingPos();
+            long rgSize = block.getCompressedSize();
+            // start a new split
+            if (!hasOpenSplit) {
+                currentStart = rgStart;
+                currentLength = rgSize;
+                hasOpenSplit = true;
+                continue;
+            }
+            // exceeds threshold, close current split
+            if (currentLength + rgSize > splitSizeBytes) {
+                splits.add(new FileSourceSplit(tableId, filePath, 
currentStart, currentLength));
+                // start next split
+                currentStart = rgStart;
+                currentLength = rgSize;
+            } else {
+                currentLength += rgSize;
+            }
+        }
+        // last split
+        if (hasOpenSplit && currentLength > 0) {
+            splits.add(new FileSourceSplit(tableId, filePath, currentStart, 
currentLength));
+        }
+        return splits;
+    }
+
+    private List<BlockMetaData> readRowGroups(String filePath) throws 
IOException {
+        Path path = new Path(filePath);
+        Configuration conf = new Configuration();
+        try (ParquetFileReader reader =
+                ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
+            return reader.getFooter().getBlocks();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategyTest.java
new file mode 100644
index 0000000000..849aae8432
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategyTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.when;
+
+public class ParquetFileSplitStrategyTest {
+
+    private static final String TABLE_ID = "test.test_table";
+    private static final String FILE_PATH = "/tmp/test.parquet";
+
+    @Test
+    void testSplitByRowGroupsEmpty() {
+        ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(100);
+        List<FileSourceSplit> splits =
+                strategy.splitByRowGroups(TABLE_ID, FILE_PATH, 
Collections.emptyList());
+        Assertions.assertTrue(splits.isEmpty());
+    }
+
+    @Test
+    void testSplitByRowGroupsSingleRowGroup() {
+        ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(1000);
+        List<BlockMetaData> blocks = new ArrayList<>();
+        blocks.add(mockBlock(0, 200));
+        List<FileSourceSplit> splits = strategy.splitByRowGroups(TABLE_ID, 
FILE_PATH, blocks);
+        Assertions.assertEquals(1, splits.size());
+        FileSourceSplit split = splits.get(0);
+        Assertions.assertEquals(0, split.getStart());
+        Assertions.assertEquals(200, split.getLength());
+    }
+
+    @Test
+    void testSplitByRowGroupsMergeRowGroups() {
+        ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(500);
+        List<BlockMetaData> blocks = new ArrayList<>();
+        blocks.add(mockBlock(0, 100));
+        blocks.add(mockBlock(100, 150));
+        blocks.add(mockBlock(250, 200));
+        List<FileSourceSplit> splits = strategy.splitByRowGroups(TABLE_ID, 
FILE_PATH, blocks);
+        // 100 + 150 + 200 = 450 < 500
+        Assertions.assertEquals(1, splits.size());
+        FileSourceSplit split = splits.get(0);
+        Assertions.assertEquals(0, split.getStart());
+        Assertions.assertEquals(450, split.getLength());
+    }
+
+    @Test
+    void testSplitByRowGroupsSplitWhenExceedsThreshold() {
+        ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(300);
+        List<BlockMetaData> blocks = new ArrayList<>();
+        blocks.add(mockBlock(0, 100));
+        blocks.add(mockBlock(100, 150));
+        blocks.add(mockBlock(250, 200));
+        List<FileSourceSplit> splits = strategy.splitByRowGroups(TABLE_ID, 
FILE_PATH, blocks);
+        Assertions.assertEquals(2, splits.size());
+        FileSourceSplit first = splits.get(0);
+        Assertions.assertEquals(0, first.getStart());
+        Assertions.assertEquals(250, first.getLength());
+        FileSourceSplit second = splits.get(1);
+        Assertions.assertEquals(250, second.getStart());
+        Assertions.assertEquals(200, second.getLength());
+    }
+
+    private BlockMetaData mockBlock(long start, long compressedSize) {
+        BlockMetaData block = Mockito.mock(BlockMetaData.class);
+        when(block.getStartingPos()).thenReturn(start);
+        when(block.getCompressedSize()).thenReturn(compressedSize);
+        return block;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index f5e3ff74b5..bf3311efab 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -18,44 +18,21 @@
 package org.apache.seatunnel.connectors.seatunnel.file.local.source;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileAccordingToSplitSizeSplitStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSplitStrategyFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
-
-import static 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions.DEFAULT_ROW_DELIMITER;
 
 public class LocalFileSource extends BaseMultipleTableFileSource {
 
     public LocalFileSource(ReadonlyConfig readonlyConfig) {
         super(
                 new MultipleTableLocalFileSourceConfig(readonlyConfig),
-                initFileSplitStrategy(readonlyConfig));
+                
LocalFileSplitStrategyFactory.initFileSplitStrategy(readonlyConfig));
     }
 
     @Override
     public String getPluginName() {
         return FileSystemType.LOCAL.getFileSystemPluginName();
     }
-
-    private static FileSplitStrategy initFileSplitStrategy(ReadonlyConfig 
readonlyConfig) {
-        if (readonlyConfig.get(FileBaseSourceOptions.ENABLE_FILE_SPLIT)) {
-            return new DefaultFileSplitStrategy();
-        }
-        String rowDelimiter =
-                
!readonlyConfig.getOptional(FileBaseSourceOptions.ROW_DELIMITER).isPresent()
-                        ? DEFAULT_ROW_DELIMITER
-                        : 
readonlyConfig.get(FileBaseSourceOptions.ROW_DELIMITER);
-        long skipHeaderRowNumber =
-                readonlyConfig.get(FileBaseSourceOptions.CSV_USE_HEADER_LINE)
-                        ? 1L
-                        : 
readonlyConfig.get(FileBaseSourceOptions.SKIP_HEADER_ROW_NUMBER);
-        String encodingName = 
readonlyConfig.get(FileBaseSourceOptions.ENCODING);
-        long splitSize = 
readonlyConfig.get(FileBaseSourceOptions.FILE_SPLIT_SIZE);
-        return new LocalFileAccordingToSplitSizeSplitStrategy(
-                rowDelimiter, skipHeaderRowNumber, encodingName, splitSize);
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index 9672f2cd80..987558810e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -86,7 +86,11 @@ public class LocalFileSourceFactory implements 
TableSourceFactory {
                         FileBaseSourceOptions.ENCODING)
                 .conditional(
                         FileBaseSourceOptions.FILE_FORMAT_TYPE,
-                        Arrays.asList(FileFormat.TEXT, FileFormat.JSON, 
FileFormat.CSV),
+                        Arrays.asList(
+                                FileFormat.TEXT,
+                                FileFormat.JSON,
+                                FileFormat.CSV,
+                                FileFormat.PARQUET),
                         FileBaseSourceOptions.ENABLE_FILE_SPLIT)
                 .conditional(
                         FileBaseSourceOptions.ENABLE_FILE_SPLIT,
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSplitStrategyFactory.java
similarity index 63%
copy from 
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
copy to 
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSplitStrategyFactory.java
index f5e3ff74b5..d6b632e164 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSplitStrategyFactory.java
@@ -14,37 +14,37 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.seatunnel.connectors.seatunnel.file.local.source;
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.split;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileAccordingToSplitSizeSplitStrategy;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.ParquetFileSplitStrategy;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions.DEFAULT_ROW_DELIMITER;
 
-public class LocalFileSource extends BaseMultipleTableFileSource {
-
-    public LocalFileSource(ReadonlyConfig readonlyConfig) {
-        super(
-                new MultipleTableLocalFileSourceConfig(readonlyConfig),
-                initFileSplitStrategy(readonlyConfig));
-    }
-
-    @Override
-    public String getPluginName() {
-        return FileSystemType.LOCAL.getFileSystemPluginName();
-    }
+public class LocalFileSplitStrategyFactory {
 
-    private static FileSplitStrategy initFileSplitStrategy(ReadonlyConfig 
readonlyConfig) {
-        if (readonlyConfig.get(FileBaseSourceOptions.ENABLE_FILE_SPLIT)) {
+    public static FileSplitStrategy initFileSplitStrategy(ReadonlyConfig 
readonlyConfig) {
+        if (!readonlyConfig.get(FileBaseSourceOptions.ENABLE_FILE_SPLIT)) {
             return new DefaultFileSplitStrategy();
         }
+        if 
(!readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE).supportFileSplit())
 {
+            return new DefaultFileSplitStrategy();
+        }
+        if (readonlyConfig.get(FileBaseSourceOptions.COMPRESS_CODEC) != 
CompressFormat.NONE
+                || 
readonlyConfig.get(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC)
+                        != ArchiveCompressFormat.NONE) {
+            return new DefaultFileSplitStrategy();
+        }
+        long fileSplitSize = 
readonlyConfig.get(FileBaseSourceOptions.FILE_SPLIT_SIZE);
+        if (FileFormat.PARQUET == 
readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE)) {
+            return new ParquetFileSplitStrategy(fileSplitSize);
+        }
         String rowDelimiter =
                 
!readonlyConfig.getOptional(FileBaseSourceOptions.ROW_DELIMITER).isPresent()
                         ? DEFAULT_ROW_DELIMITER
@@ -54,8 +54,7 @@ public class LocalFileSource extends 
BaseMultipleTableFileSource {
                         ? 1L
                         : 
readonlyConfig.get(FileBaseSourceOptions.SKIP_HEADER_ROW_NUMBER);
         String encodingName = 
readonlyConfig.get(FileBaseSourceOptions.ENCODING);
-        long splitSize = 
readonlyConfig.get(FileBaseSourceOptions.FILE_SPLIT_SIZE);
         return new LocalFileAccordingToSplitSizeSplitStrategy(
-                rowDelimiter, skipHeaderRowNumber, encodingName, splitSize);
+                rowDelimiter, skipHeaderRowNumber, encodingName, 
fileSplitSize);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileSourceTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileSourceTest.java
new file mode 100644
index 0000000000..379f2303a5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileSourceTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.local;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileAccordingToSplitSizeSplitStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSplitStrategyFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.ParquetFileSplitStrategy;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LocalFileSourceTest {
+
+    @Test
+    void testInitFileSplitStrategy() {
+        // test orc
+        Map<String, Object> map = new HashMap<>();
+        map.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), FileFormat.ORC);
+        map.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        FileSplitStrategy fileSplitStrategy =
+                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map));
+        Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, 
fileSplitStrategy);
+        // test text, no split
+        Map<String, Object> map1 = new HashMap<>();
+        map1.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.TEXT);
+        fileSplitStrategy =
+                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map1));
+        Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, 
fileSplitStrategy);
+        // test text, split
+        Map<String, Object> map2 = new HashMap<>();
+        map2.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.TEXT);
+        map2.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        fileSplitStrategy =
+                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map2));
+        Assertions.assertInstanceOf(
+                LocalFileAccordingToSplitSizeSplitStrategy.class, 
fileSplitStrategy);
+        // test csv, split
+        Map<String, Object> map3 = new HashMap<>();
+        map3.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), FileFormat.CSV);
+        map3.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        fileSplitStrategy =
+                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map3));
+        Assertions.assertInstanceOf(
+                LocalFileAccordingToSplitSizeSplitStrategy.class, 
fileSplitStrategy);
+        // test json, split
+        Map<String, Object> map4 = new HashMap<>();
+        map4.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.JSON);
+        map4.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        fileSplitStrategy =
+                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map4));
+        Assertions.assertInstanceOf(
+                LocalFileAccordingToSplitSizeSplitStrategy.class, 
fileSplitStrategy);
+        // test parquet, split
+        Map<String, Object> map5 = new HashMap<>();
+        map5.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.PARQUET);
+        map5.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        fileSplitStrategy =
+                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map5));
+        Assertions.assertInstanceOf(ParquetFileSplitStrategy.class, 
fileSplitStrategy);
+        // test compress 1
+        Map<String, Object> map6 = new HashMap<>();
+        map6.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.PARQUET);
+        map6.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        map6.put(FileBaseSourceOptions.COMPRESS_CODEC.key(), 
CompressFormat.LZO);
+        map6.put(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC.key(), 
ArchiveCompressFormat.NONE);
+        fileSplitStrategy =
+                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map6));
+        Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, 
fileSplitStrategy);
+        // test compress 2
+        Map<String, Object> map7 = new HashMap<>();
+        map7.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.PARQUET);
+        map7.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        map7.put(FileBaseSourceOptions.COMPRESS_CODEC.key(), 
CompressFormat.NONE);
+        map7.put(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC.key(), 
ArchiveCompressFormat.NONE);
+        fileSplitStrategy =
+                
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map7));
+        Assertions.assertInstanceOf(ParquetFileSplitStrategy.class, 
fileSplitStrategy);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index e29448a1b7..bc1fb26d93 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -370,6 +370,7 @@ public class LocalFileIT extends TestSuiteBase {
         helper.execute("/parquet/fake_to_local_file_parquet.conf");
         // test read local parquet file
         helper.execute("/parquet/local_file_parquet_to_assert.conf");
+        
helper.execute("/parquet/local_file_parquet_enable_split_to_assert.conf");
         // test read local parquet file with projection
         
helper.execute("/parquet/local_file_parquet_projection_to_assert.conf");
         // test read filtered local file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_enable_split_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_enable_split_to_assert.conf
new file mode 100644
index 0000000000..6baece1205
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_enable_split_to_assert.conf
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  LocalFile {
+    path = "/seatunnel/read/parquet"
+    file_format_type = "parquet"
+    enable_file_split = true
+    file_split_size = 3
+    plugin_output = "fake"
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = hobby
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file


Reply via email to