This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3f2ea78a46 [Feature][connectors-v2/connector-file]Support logical
Parquet splits (#10239)
3f2ea78a46 is described below
commit 3f2ea78a46865c8d5b39cb4170e80e8062c50367
Author: 老王 <[email protected]>
AuthorDate: Fri Jan 9 20:56:11 2026 +0800
[Feature][connectors-v2/connector-file]Support logical Parquet splits
(#10239)
---
docs/en/connector-v2/source/LocalFile.md | 2 +-
docs/zh/connector-v2/source/LocalFile.md | 2 +-
.../connector-file/connector-file-base/pom.xml | 7 ++
.../seatunnel/file/config/FileFormat.java | 16 +++
.../file/exception/FileConnectorErrorCode.java | 4 +-
.../file/source/reader/ParquetReadStrategy.java | 22 +++-
.../file/source/split/FileSplitStrategy.java | 12 +++
.../source/split/ParquetFileSplitStrategy.java | 118 +++++++++++++++++++++
.../source/split/ParquetFileSplitStrategyTest.java | 94 ++++++++++++++++
.../file/local/source/LocalFileSource.java | 27 +----
.../file/local/source/LocalFileSourceFactory.java | 6 +-
.../LocalFileSplitStrategyFactory.java} | 43 ++++----
.../seatunnel/file/local/LocalFileSourceTest.java | 103 ++++++++++++++++++
.../e2e/connector/file/local/LocalFileIT.java | 1 +
.../local_file_parquet_enable_split_to_assert.conf | 98 +++++++++++++++++
15 files changed, 501 insertions(+), 54 deletions(-)
diff --git a/docs/en/connector-v2/source/LocalFile.md
b/docs/en/connector-v2/source/LocalFile.md
index e6de4f3033..bc2edd88f8 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -420,7 +420,7 @@ File modification time filter. The connector will filter
some files base on the
### enable_file_split [string]
-Turn on the file splitting function, the default is false。It can be selected
when the file type is csv, text, json and non-compressed format.
+Turn on the file splitting function, the default is false.It can be selected
when the file type is csv, text, json, parquet and non-compressed format.
### file_split_size [long]
diff --git a/docs/zh/connector-v2/source/LocalFile.md
b/docs/zh/connector-v2/source/LocalFile.md
index 0ff85d51e7..215669fc7b 100644
--- a/docs/zh/connector-v2/source/LocalFile.md
+++ b/docs/zh/connector-v2/source/LocalFile.md
@@ -421,7 +421,7 @@ null_format 定义哪些字符串可以表示为 null。
### enable_file_split [boolean]
-开启文件分割功能,默认为false。文件类型为csv、text、json、非压缩格式时可选择。
+开启文件分割功能,默认为false。文件类型为csv、text、json、parquet非压缩格式时可选择。
### file_split_size [long]
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index ca03d0963e..8742d0a47c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -185,6 +185,13 @@
<artifactId>flexmark-all</artifactId>
<version>${flexmark-all.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index 57b9e01bdd..1253b82725 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -41,9 +41,12 @@ import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.XmlReadStrategy;
+import lombok.extern.slf4j.Slf4j;
+
import java.io.Serializable;
import java.util.Arrays;
+@Slf4j
public enum FileFormat implements Serializable {
CSV("csv") {
@Override
@@ -207,4 +210,17 @@ public enum FileFormat implements Serializable {
public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
return null;
}
+
+ public boolean supportFileSplit() {
+ switch (this) {
+ case CSV:
+ case TEXT:
+ case JSON:
+ case PARQUET:
+ return true;
+ default:
+ log.info("The {} file type does not support file split", this);
+ return false;
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
index f5847aff14..db30dfcb8f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
@@ -28,7 +28,9 @@ public enum FileConnectorErrorCode implements
SeaTunnelErrorCode {
FILE_READ_STRATEGY_NOT_SUPPORT("FILE-06", "File strategy not support"),
FORMAT_NOT_SUPPORT("FILE-07", "Format not support"),
FILE_READ_FAILED("FILE-08", "File read failed"),
- BINARY_FILE_PART_ORDER_ERROR("FILE-09", "Binary file fragment order
abnormality");
+ BINARY_FILE_PART_ORDER_ERROR("FILE-09", "Binary file fragment order
abnormality"),
+ FILE_SPLIT_SIZE_ILLEGAL("FILE-10", "SplitSizeBytes must be greater than
0"),
+ FILE_SPLIT_FAIL("FILE-11", "File split fail");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 2c2e134f4c..fe1cab452a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
@@ -89,6 +90,14 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
@Override
public void read(String path, String tableId, Collector<SeaTunnelRow>
output)
throws FileConnectorException, IOException {
+ this.read(new FileSourceSplit(path), output);
+ }
+
+ @Override
+ public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
+ throws IOException, FileConnectorException {
+ String tableId = split.getTableId();
+ String path = split.getFilePath();
if (Boolean.FALSE.equals(checkFileType(path))) {
String errorMsg =
String.format(
@@ -107,11 +116,18 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
dataModel.addLogicalTypeConversion(new
Conversions.DecimalConversion());
dataModel.addLogicalTypeConversion(new
TimeConversions.DateConversion());
dataModel.addLogicalTypeConversion(new
TimeConversions.LocalTimestampMillisConversion());
+ final boolean useSplitRange =
+ enableSplitFile && split.getStart() >= 0 && split.getLength()
> 0;
GenericRecord record;
- try (ParquetReader<GenericData.Record> reader =
+ AvroParquetReader.Builder<GenericData.Record> builder =
AvroParquetReader.<GenericData.Record>builder(hadoopInputFile)
- .withDataModel(dataModel)
- .build()) {
+ .withDataModel(dataModel);
+ if (useSplitRange) {
+ long start = split.getStart();
+ long end = start + split.getLength();
+ builder.withFileRange(start, end);
+ }
+ try (ParquetReader<GenericData.Record> reader = builder.build()) {
while ((record = reader.read()) != null) {
Object[] fields;
if (isMergePartition) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategy.java
index 12f7a4746f..bb5bab7e37 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategy.java
@@ -19,6 +19,18 @@ package
org.apache.seatunnel.connectors.seatunnel.file.source.split;
import java.io.Serializable;
import java.util.List;
+/**
+ * {@link FileSplitStrategy} defines the contract for splitting a file into
one or more {@link
+ * FileSourceSplit}s that can be processed in parallel by file-based sources.
+ *
+ * <p>The split strategy determines how a file is logically divided, such as
by byte ranges, record
+ * boundaries, or format-specific physical units. Implementations are
responsible for ensuring that
+ * each generated split is readable and does not violate the semantics of the
underlying file
+ * format.
+ *
+ * <p>The resulting {@link FileSourceSplit}s describe the portion of the file
to be read, while the
+ * actual data parsing and decoding are handled by the corresponding reader
implementation.
+ */
public interface FileSplitStrategy extends Serializable {
List<FileSourceSplit> split(String tableId, String filePath);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
new file mode 100644
index 0000000000..48b7adc102
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * {@link ParquetFileSplitStrategy} defines a split strategy for Parquet files
based on Parquet
+ * physical storage units (RowGroups).
+ *
+ * <p>This strategy uses {@code RowGroup} as the minimum indivisible split
unit and generates {@link
+ * FileSourceSplit}s by merging one or more contiguous RowGroups according to
the configured split
+ * size. A split will never break a RowGroup, ensuring correctness and
compatibility with Parquet
+ * readers.
+ *
+ * <p>The generated split range ({@code start}, {@code length}) represents a
byte range covering
+ * complete RowGroups. The actual row-level reading and decoding are delegated
to the Parquet reader
+ * implementation.
+ *
+ * <p>This design enables efficient parallel reading of Parquet files while
preserving Parquet
+ * format semantics and avoiding invalid byte-level splits.
+ */
+public class ParquetFileSplitStrategy implements FileSplitStrategy {
+
+ private final long splitSizeBytes;
+
+ public ParquetFileSplitStrategy(long splitSizeBytes) {
+ if (splitSizeBytes <= 0) {
+ throw new SeaTunnelRuntimeException(
+ FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
+ "SplitSizeBytes must be greater than 0");
+ }
+ this.splitSizeBytes = splitSizeBytes;
+ }
+
+ @Override
+ public List<FileSourceSplit> split(String tableId, String filePath) {
+ try {
+ return splitByRowGroups(tableId, filePath,
readRowGroups(filePath));
+ } catch (IOException e) {
+ throw new
SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_SPLIT_FAIL, e);
+ }
+ }
+
+ /**
+ * Core split logic based on row group metadata. This method is IO-free
and unit-test friendly.
+ */
+ List<FileSourceSplit> splitByRowGroups(
+ String tableId, String filePath, List<BlockMetaData> rowGroups) {
+ List<FileSourceSplit> splits = new ArrayList<>();
+ if (rowGroups == null || rowGroups.isEmpty()) {
+ return splits;
+ }
+ long currentStart = 0;
+ long currentLength = 0;
+ boolean hasOpenSplit = false;
+ for (BlockMetaData block : rowGroups) {
+ long rgStart = block.getStartingPos();
+ long rgSize = block.getCompressedSize();
+ // start a new split
+ if (!hasOpenSplit) {
+ currentStart = rgStart;
+ currentLength = rgSize;
+ hasOpenSplit = true;
+ continue;
+ }
+ // exceeds threshold, close current split
+ if (currentLength + rgSize > splitSizeBytes) {
+ splits.add(new FileSourceSplit(tableId, filePath,
currentStart, currentLength));
+ // start next split
+ currentStart = rgStart;
+ currentLength = rgSize;
+ } else {
+ currentLength += rgSize;
+ }
+ }
+ // last split
+ if (hasOpenSplit && currentLength > 0) {
+ splits.add(new FileSourceSplit(tableId, filePath, currentStart,
currentLength));
+ }
+ return splits;
+ }
+
+ private List<BlockMetaData> readRowGroups(String filePath) throws
IOException {
+ Path path = new Path(filePath);
+ Configuration conf = new Configuration();
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
+ return reader.getFooter().getBlocks();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategyTest.java
new file mode 100644
index 0000000000..849aae8432
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategyTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.when;
+
+public class ParquetFileSplitStrategyTest {
+
+ private static final String TABLE_ID = "test.test_table";
+ private static final String FILE_PATH = "/tmp/test.parquet";
+
+ @Test
+ void testSplitByRowGroupsEmpty() {
+ ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(100);
+ List<FileSourceSplit> splits =
+ strategy.splitByRowGroups(TABLE_ID, FILE_PATH,
Collections.emptyList());
+ Assertions.assertTrue(splits.isEmpty());
+ }
+
+ @Test
+ void testSplitByRowGroupsSingleRowGroup() {
+ ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(1000);
+ List<BlockMetaData> blocks = new ArrayList<>();
+ blocks.add(mockBlock(0, 200));
+ List<FileSourceSplit> splits = strategy.splitByRowGroups(TABLE_ID,
FILE_PATH, blocks);
+ Assertions.assertEquals(1, splits.size());
+ FileSourceSplit split = splits.get(0);
+ Assertions.assertEquals(0, split.getStart());
+ Assertions.assertEquals(200, split.getLength());
+ }
+
+ @Test
+ void testSplitByRowGroupsMergeRowGroups() {
+ ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(500);
+ List<BlockMetaData> blocks = new ArrayList<>();
+ blocks.add(mockBlock(0, 100));
+ blocks.add(mockBlock(100, 150));
+ blocks.add(mockBlock(250, 200));
+ List<FileSourceSplit> splits = strategy.splitByRowGroups(TABLE_ID,
FILE_PATH, blocks);
+ // 100 + 150 + 200 = 450 < 500
+ Assertions.assertEquals(1, splits.size());
+ FileSourceSplit split = splits.get(0);
+ Assertions.assertEquals(0, split.getStart());
+ Assertions.assertEquals(450, split.getLength());
+ }
+
+ @Test
+ void testSplitByRowGroupsSplitWhenExceedsThreshold() {
+ ParquetFileSplitStrategy strategy = new ParquetFileSplitStrategy(300);
+ List<BlockMetaData> blocks = new ArrayList<>();
+ blocks.add(mockBlock(0, 100));
+ blocks.add(mockBlock(100, 150));
+ blocks.add(mockBlock(250, 200));
+ List<FileSourceSplit> splits = strategy.splitByRowGroups(TABLE_ID,
FILE_PATH, blocks);
+ Assertions.assertEquals(2, splits.size());
+ FileSourceSplit first = splits.get(0);
+ Assertions.assertEquals(0, first.getStart());
+ Assertions.assertEquals(250, first.getLength());
+ FileSourceSplit second = splits.get(1);
+ Assertions.assertEquals(250, second.getStart());
+ Assertions.assertEquals(200, second.getLength());
+ }
+
+ private BlockMetaData mockBlock(long start, long compressedSize) {
+ BlockMetaData block = Mockito.mock(BlockMetaData.class);
+ when(block.getStartingPos()).thenReturn(start);
+ when(block.getCompressedSize()).thenReturn(compressedSize);
+ return block;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index f5e3ff74b5..bf3311efab 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -18,44 +18,21 @@
package org.apache.seatunnel.connectors.seatunnel.file.local.source;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
-import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileAccordingToSplitSizeSplitStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSplitStrategyFactory;
import
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
-
-import static
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions.DEFAULT_ROW_DELIMITER;
public class LocalFileSource extends BaseMultipleTableFileSource {
public LocalFileSource(ReadonlyConfig readonlyConfig) {
super(
new MultipleTableLocalFileSourceConfig(readonlyConfig),
- initFileSplitStrategy(readonlyConfig));
+
LocalFileSplitStrategyFactory.initFileSplitStrategy(readonlyConfig));
}
@Override
public String getPluginName() {
return FileSystemType.LOCAL.getFileSystemPluginName();
}
-
- private static FileSplitStrategy initFileSplitStrategy(ReadonlyConfig
readonlyConfig) {
- if (readonlyConfig.get(FileBaseSourceOptions.ENABLE_FILE_SPLIT)) {
- return new DefaultFileSplitStrategy();
- }
- String rowDelimiter =
-
!readonlyConfig.getOptional(FileBaseSourceOptions.ROW_DELIMITER).isPresent()
- ? DEFAULT_ROW_DELIMITER
- :
readonlyConfig.get(FileBaseSourceOptions.ROW_DELIMITER);
- long skipHeaderRowNumber =
- readonlyConfig.get(FileBaseSourceOptions.CSV_USE_HEADER_LINE)
- ? 1L
- :
readonlyConfig.get(FileBaseSourceOptions.SKIP_HEADER_ROW_NUMBER);
- String encodingName =
readonlyConfig.get(FileBaseSourceOptions.ENCODING);
- long splitSize =
readonlyConfig.get(FileBaseSourceOptions.FILE_SPLIT_SIZE);
- return new LocalFileAccordingToSplitSizeSplitStrategy(
- rowDelimiter, skipHeaderRowNumber, encodingName, splitSize);
- }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index 9672f2cd80..987558810e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -86,7 +86,11 @@ public class LocalFileSourceFactory implements
TableSourceFactory {
FileBaseSourceOptions.ENCODING)
.conditional(
FileBaseSourceOptions.FILE_FORMAT_TYPE,
- Arrays.asList(FileFormat.TEXT, FileFormat.JSON,
FileFormat.CSV),
+ Arrays.asList(
+ FileFormat.TEXT,
+ FileFormat.JSON,
+ FileFormat.CSV,
+ FileFormat.PARQUET),
FileBaseSourceOptions.ENABLE_FILE_SPLIT)
.conditional(
FileBaseSourceOptions.ENABLE_FILE_SPLIT,
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSplitStrategyFactory.java
similarity index 63%
copy from
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
copy to
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSplitStrategyFactory.java
index f5e3ff74b5..d6b632e164 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSplitStrategyFactory.java
@@ -14,37 +14,37 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.seatunnel.connectors.seatunnel.file.local.source;
+package org.apache.seatunnel.connectors.seatunnel.file.local.source.split;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
-import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileAccordingToSplitSizeSplitStrategy;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.ParquetFileSplitStrategy;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions.DEFAULT_ROW_DELIMITER;
-public class LocalFileSource extends BaseMultipleTableFileSource {
-
- public LocalFileSource(ReadonlyConfig readonlyConfig) {
- super(
- new MultipleTableLocalFileSourceConfig(readonlyConfig),
- initFileSplitStrategy(readonlyConfig));
- }
-
- @Override
- public String getPluginName() {
- return FileSystemType.LOCAL.getFileSystemPluginName();
- }
+public class LocalFileSplitStrategyFactory {
- private static FileSplitStrategy initFileSplitStrategy(ReadonlyConfig
readonlyConfig) {
- if (readonlyConfig.get(FileBaseSourceOptions.ENABLE_FILE_SPLIT)) {
+ public static FileSplitStrategy initFileSplitStrategy(ReadonlyConfig
readonlyConfig) {
+ if (!readonlyConfig.get(FileBaseSourceOptions.ENABLE_FILE_SPLIT)) {
return new DefaultFileSplitStrategy();
}
+ if
(!readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE).supportFileSplit())
{
+ return new DefaultFileSplitStrategy();
+ }
+ if (readonlyConfig.get(FileBaseSourceOptions.COMPRESS_CODEC) !=
CompressFormat.NONE
+ ||
readonlyConfig.get(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC)
+ != ArchiveCompressFormat.NONE) {
+ return new DefaultFileSplitStrategy();
+ }
+ long fileSplitSize =
readonlyConfig.get(FileBaseSourceOptions.FILE_SPLIT_SIZE);
+ if (FileFormat.PARQUET ==
readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE)) {
+ return new ParquetFileSplitStrategy(fileSplitSize);
+ }
String rowDelimiter =
!readonlyConfig.getOptional(FileBaseSourceOptions.ROW_DELIMITER).isPresent()
? DEFAULT_ROW_DELIMITER
@@ -54,8 +54,7 @@ public class LocalFileSource extends
BaseMultipleTableFileSource {
? 1L
:
readonlyConfig.get(FileBaseSourceOptions.SKIP_HEADER_ROW_NUMBER);
String encodingName =
readonlyConfig.get(FileBaseSourceOptions.ENCODING);
- long splitSize =
readonlyConfig.get(FileBaseSourceOptions.FILE_SPLIT_SIZE);
return new LocalFileAccordingToSplitSizeSplitStrategy(
- rowDelimiter, skipHeaderRowNumber, encodingName, splitSize);
+ rowDelimiter, skipHeaderRowNumber, encodingName,
fileSplitSize);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileSourceTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileSourceTest.java
new file mode 100644
index 0000000000..379f2303a5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileSourceTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.local;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileAccordingToSplitSizeSplitStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSplitStrategyFactory;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.ParquetFileSplitStrategy;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class LocalFileSourceTest {
+
+ @Test
+ void testInitFileSplitStrategy() {
+ // test orc
+ Map<String, Object> map = new HashMap<>();
+ map.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), FileFormat.ORC);
+ map.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ FileSplitStrategy fileSplitStrategy =
+
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map));
+ Assertions.assertInstanceOf(DefaultFileSplitStrategy.class,
fileSplitStrategy);
+ // test text, no split
+ Map<String, Object> map1 = new HashMap<>();
+ map1.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.TEXT);
+ fileSplitStrategy =
+
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map1));
+ Assertions.assertInstanceOf(DefaultFileSplitStrategy.class,
fileSplitStrategy);
+ // test text, split
+ Map<String, Object> map2 = new HashMap<>();
+ map2.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.TEXT);
+ map2.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ fileSplitStrategy =
+
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map2));
+ Assertions.assertInstanceOf(
+ LocalFileAccordingToSplitSizeSplitStrategy.class,
fileSplitStrategy);
+ // test csv, split
+ Map<String, Object> map3 = new HashMap<>();
+ map3.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), FileFormat.CSV);
+ map3.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ fileSplitStrategy =
+
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map3));
+ Assertions.assertInstanceOf(
+ LocalFileAccordingToSplitSizeSplitStrategy.class,
fileSplitStrategy);
+ // test json, split
+ Map<String, Object> map4 = new HashMap<>();
+ map4.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.JSON);
+ map4.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ fileSplitStrategy =
+
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map4));
+ Assertions.assertInstanceOf(
+ LocalFileAccordingToSplitSizeSplitStrategy.class,
fileSplitStrategy);
+ // test parquet, split
+ Map<String, Object> map5 = new HashMap<>();
+ map5.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.PARQUET);
+ map5.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ fileSplitStrategy =
+
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map5));
+ Assertions.assertInstanceOf(ParquetFileSplitStrategy.class,
fileSplitStrategy);
+ // test compress 1
+ Map<String, Object> map6 = new HashMap<>();
+ map6.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.PARQUET);
+ map6.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ map6.put(FileBaseSourceOptions.COMPRESS_CODEC.key(),
CompressFormat.LZO);
+ map6.put(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC.key(),
ArchiveCompressFormat.NONE);
+ fileSplitStrategy =
+
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map6));
+ Assertions.assertInstanceOf(DefaultFileSplitStrategy.class,
fileSplitStrategy);
+ // test compress 2
+ Map<String, Object> map7 = new HashMap<>();
+ map7.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.PARQUET);
+ map7.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ map7.put(FileBaseSourceOptions.COMPRESS_CODEC.key(),
CompressFormat.NONE);
+ map7.put(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC.key(),
ArchiveCompressFormat.NONE);
+ fileSplitStrategy =
+
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map7));
+ Assertions.assertInstanceOf(ParquetFileSplitStrategy.class,
fileSplitStrategy);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index e29448a1b7..bc1fb26d93 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -370,6 +370,7 @@ public class LocalFileIT extends TestSuiteBase {
helper.execute("/parquet/fake_to_local_file_parquet.conf");
// test read local parquet file
helper.execute("/parquet/local_file_parquet_to_assert.conf");
+
helper.execute("/parquet/local_file_parquet_enable_split_to_assert.conf");
// test read local parquet file with projection
helper.execute("/parquet/local_file_parquet_projection_to_assert.conf");
// test read filtered local file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_enable_split_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_enable_split_to_assert.conf
new file mode 100644
index 0000000000..6baece1205
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_enable_split_to_assert.conf
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/parquet"
+ file_format_type = "parquet"
+ enable_file_split = true
+ file_split_size = 3
+ plugin_output = "fake"
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = hobby
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file