This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 37999ea5bb [Feature][Connector-V2] Enable file split for S3File source
(#10450)
37999ea5bb is described below
commit 37999ea5bbac070163b4c9f9e322781fa940a5ee
Author: yzeng1618 <[email protected]>
AuthorDate: Wed Mar 11 17:14:45 2026 +0800
[Feature][Connector-V2] Enable file split for S3File source (#10450)
---
docs/en/connectors/source/S3File.md | 28 ++++-
docs/zh/connectors/source/S3File.md | 27 +++++
.../seatunnel/file/config/FileBaseOptions.java | 4 +-
.../file/source/reader/CsvReadStrategy.java | 19 ++-
.../file/source/reader/TextReadStrategy.java | 5 +-
.../split/AccordingToSplitSizeSplitStrategy.java | 4 +-
.../source/split/FileSplitStrategyFactory.java | 34 +++++-
.../source/split/ParquetFileSplitStrategy.java | 15 ++-
.../reader/ReadStrategySplitFallbackTest.java | 132 +++++++++++++++++++++
.../source/split/FileSplitStrategyFactoryTest.java | 88 ++++++++++++++
.../seatunnel/file/s3/source/S3FileSource.java | 6 +-
.../file/s3/source/S3FileSourceFactory.java | 12 ++
.../seatunnel/file/s3/S3FileFactoryTest.java | 71 +++++++++++
.../e2e/connector/file/s3/S3FileWithFilterIT.java | 22 ++++
.../test/resources/text/e2e_split_with_header.txt | 6 +
.../text/s3_file_text_enable_split_to_assert.conf | 79 ++++++++++++
16 files changed, 532 insertions(+), 20 deletions(-)
diff --git a/docs/en/connectors/source/S3File.md
b/docs/en/connectors/source/S3File.md
index 36b3a8590f..6446dc224a 100644
--- a/docs/en/connectors/source/S3File.md
+++ b/docs/en/connectors/source/S3File.md
@@ -215,6 +215,8 @@ If you assign file type to `parquet` `orc`, schema option
not required, connecto
| csv_use_header_line | boolean | no | false
| Whether to use the header line to parse the
file, only used when the file_format is `csv` and the file contains the header
line that match RFC 4180
[...]
| compress_codec | string | no | none
|
[...]
| archive_compress_codec | string | no | none
|
[...]
+| enable_file_split | boolean | no | false
| Turn on logical file split to improve
parallelism for huge files. Only supported for `text`/`csv`/`json`/`parquet`
and non-compressed format.
|
+| file_split_size | long | no | 134217728
| Split size in bytes when
`enable_file_split=true`. For `text`/`csv`/`json`, the split end will be
aligned to the next `row_delimiter`. For `parquet`, the split unit is RowGroup
and will never break a RowGroup.
|
| encoding | string | no | UTF-8
|
[...]
| null_format | string | no | -
| Only used when file_format_type is text.
null_format to define which strings can be represented as null. e.g: `\N`
[...]
| binary_chunk_size | int | no | 1024
| Only used when file_format_type is binary.
The chunk size (in bytes) for reading binary files. Default is 1024 bytes.
Larger values may improve performance for large files but use more memory.
[...]
@@ -311,6 +313,30 @@ The result of this example matching is:
/data/seatunnel/20241005/old_data.csv
```
+### enable_file_split [boolean]
+
+Turn on the file splitting function, the default is false. It can be selected
when the file type is csv, text, json, parquet and non-compressed format.
+
+- `text`/`csv`/`json`: split by `file_split_size` and align to the next
`row_delimiter` to avoid breaking records.
+- `parquet`: split by RowGroup (logical split), never breaks a RowGroup.
+
+**Recommendations**
+- Enable when reading a few large files and you want higher read parallelism.
+- Disable when reading many small files, or when parallelism is low (splitting
adds overhead).
+
+**Limitations**
+- Not supported for compressed files (`compress_codec` != `none`) or archive
files (`archive_compress_codec` != `none`) — it will fall back to non-splitting
and emit a warning log.
+- For `text`/`csv`/`json`, actual split size may be larger than
`file_split_size` because the split end is aligned to the next `row_delimiter`.
+- For `json`, splitting is only supported for JSON Lines (one JSON object per
line).
+- When splitting is enabled, global record order is not guaranteed because
splits can be processed in parallel. Set `parallelism=1` if strict ordering is
required.
+
+### file_split_size [long]
+
+File split size, which can be filled in when the enable_file_split parameter
is true. The unit is the number of bytes. The default value is the number of
bytes of 128MB, which is 134217728.
+
+**Tuning**
+- Start with the default (128MB). Decrease it if parallelism is
under-utilized; increase it if the number of splits is too large.
+
### compress_codec [string]
The compress codec of files and the details that supported as the following
shown:
@@ -504,4 +530,4 @@ sink {
## Changelog
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/zh/connectors/source/S3File.md
b/docs/zh/connectors/source/S3File.md
index 4c20504f1c..75de5ef656 100644
--- a/docs/zh/connectors/source/S3File.md
+++ b/docs/zh/connectors/source/S3File.md
@@ -214,6 +214,8 @@ schema {
| xml_use_attr_format | boolean | 否 | -
| 指定是否使用标签属性格式处理数据,仅对XML文件有效。
|
| compress_codec | string | 否 | none
|
|
| archive_compress_codec | string | 否 | none
|
|
+| enable_file_split | boolean | 否 | false
| 开启大文件拆分以提升并行度。仅支持 `text`/`csv`/`json`/`parquet`
且非压缩格式(`compress_codec=none` 且 `archive_compress_codec=none`)。
|
+| file_split_size | long | 否 | 134217728
| `enable_file_split=true`
时生效,单位字节。`text`/`csv`/`json` 按 `file_split_size` 拆分并对齐到下一个
`row_delimiter`;`parquet` 以 RowGroup 为拆分单位,不会切开 RowGroup。
|
| encoding | string | 否 | UTF-8
|
|
| null_format | string | 否 | -
|
仅在file_format_type为text时使用。null_format用于定义哪些字符串可以表示为null。例如:`\N`
|
| binary_chunk_size | int | 否 | 1024
|
仅在file_format_type为binary时使用。读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。
|
@@ -298,6 +300,31 @@ abc.*
/data/seatunnel/20241005/old_data.csv
```
+### enable_file_split [boolean]
+
+开启大文件拆分功能,默认 false。仅支持 `csv`/`text`/`json`/`parquet`
且非压缩格式(`compress_codec=none` 且 `archive_compress_codec=none`)。
+
+- `text`/`csv`/`json`:按 `file_split_size` 拆分并对齐到下一个
`row_delimiter`,避免切开一行/一条记录。
+- `parquet`:以 RowGroup 为逻辑拆分单位,不会切开 RowGroup。
+
+**使用建议**
+- 适合:读取少量大文件,并希望通过更高并行度提升吞吐。
+- 不建议:读取大量小文件,或并行度较低的场景(拆分会带来额外的枚举/调度开销)。
+
+**限制说明**
+- 不支持压缩文件(`compress_codec` != `none`)或归档文件(`archive_compress_codec` !=
`none`),会自动回退为不拆分,并打印 WARN 日志提示。
+- 对于 `text`/`csv`/`json`,实际 split 的大小可能略大于 `file_split_size`(因为需要对齐到下一个
`row_delimiter`)。
+- 对于 `json`,仅支持 JSON Lines(每行一个 JSON 对象)的切分读取。
+- 启用切分后,数据全局顺序不保证(split 可能并行处理导致输出顺序交错)。如需严格有序,请设置 `parallelism=1` 或关闭切分。
+
+### file_split_size [long]
+
+`enable_file_split=true` 时生效,单位字节。默认 128MB(134217728)。
+
+**调优建议**
+- 建议从默认值(128MB)开始:如果并行度未充分利用可适当调小;如果 split 数量过多可适当调大。
+- 经验公式:`file_split_size ≈ file_size / 期望并行度`。
+
### compress_codec [string]
文件的压缩编解码器,支持的详细信息如下所示:
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
index b88b64b31d..b026b92c2e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
@@ -162,5 +162,7 @@ public class FileBaseOptions extends ConnectorCommonOptions
{
.longType()
.defaultValue(128 * 1024 * 1024L)
.withDescription(
- "File split size, which can be filled in when the
enable_file_split parameter is true. The unit is the number of bytes. The
default value is the number of bytes of 128MB, which is 128*1024*1024.");
+ "File split size in bytes when
enable_file_split=true. Must be greater than 0. "
+ + "For text-like formats, the split end
will be aligned to the next row_delimiter. "
+ + "Default is 128MB (128*1024*1024).");
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
index 665c2670ed..a049d9fa55 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
@@ -106,13 +106,14 @@ public class CsvReadStrategy extends AbstractReadStrategy
{
currentFileName,
split.getStart(),
split.getLength());
+ final boolean useSplitRead = isSplitReadEnabled(split);
try (BOMInputStream bomIn = new
BOMInputStream(wrapInputStream(inputStream, split));
BufferedReader reader =
new BufferedReader(new InputStreamReader(bomIn,
getCharset(bomIn)));
CSVParser csvParser = new CSVParser(reader,
getCSVFormat(split))) {
// skip lines
- // if enableSplitFile is true,no need to skip
- if (!enableSplitFile) {
+ // if split range is used, no need to skip
+ if (!useSplitRead) {
for (int i = 0; i < skipHeaderNumber; i++) {
if (reader.readLine() == null) {
throw new IOException(
@@ -197,7 +198,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
break;
}
// rebuild inputStream
- if (enableSplitFile && split.getLength() > -1) {
+ if (isSplitReadEnabled(split)) {
resultStream = safeSlice(resultStream, split.getStart(),
split.getLength());
}
return resultStream;
@@ -209,6 +210,10 @@ public class CsvReadStrategy extends AbstractReadStrategy {
: Charset.forName(bomIn.getBOM().getCharsetName());
}
+ private boolean isSplitReadEnabled(FileSourceSplit split) {
+ return enableSplitFile && split.getLength() > -1;
+ }
+
private CSVFormat getCSVFormat(FileSourceSplit split) {
String quoteChar =
readonlyConfig.get(FileBaseSourceOptions.QUOTE_CHAR);
String escapeChar =
readonlyConfig.get(FileBaseSourceOptions.ESCAPE_CHAR);
@@ -221,8 +226,9 @@ public class CsvReadStrategy extends AbstractReadStrategy {
builder.setEscape(escapeChar.charAt(0));
}
CSVFormat csvFormat = builder.build();
- // if enableSplitFile is true,no need to skip
- if (firstLineAsHeader && (!enableSplitFile || split.getStart() == 0)) {
+ final boolean useSplitRead = isSplitReadEnabled(split);
+ // if split range is used, header should only be read in the first
split
+ if (firstLineAsHeader && (!useSplitRead || split.getStart() == 0)) {
csvFormat = csvFormat.withFirstRecordAsHeader();
}
return csvFormat;
@@ -230,7 +236,8 @@ public class CsvReadStrategy extends AbstractReadStrategy {
private List<String> getHeaders(CSVParser csvParser, FileSourceSplit
split) {
List<String> headers;
- if (firstLineAsHeader && (!enableSplitFile || split.getStart() == 0)) {
+ final boolean useSplitRead = isSplitReadEnabled(split);
+ if (firstLineAsHeader && (!useSplitRead || split.getStart() == 0)) {
headers = new ArrayList<>(csvParser.getHeaderNames());
} else {
headers =
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 074054c7e8..fa45521fbb 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -205,7 +205,8 @@ public class TextReadStrategy extends AbstractReadStrategy {
break;
}
// rebuild inputStream
- if (enableSplitFile && split.getLength() > -1) {
+ final boolean useSplitRead = enableSplitFile && split.getLength() > -1;
+ if (useSplitRead) {
actualInputStream = safeSlice(inputStream, split.getStart(),
split.getLength());
}
try (BufferedReader reader =
@@ -220,7 +221,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
}
};
StreamLineSplitter splitter;
- if (enableSplitFile) {
+ if (useSplitRead) {
splitter = new StreamLineSplitter(rowDelimiter, 0,
lineProcessor);
} else {
splitter = new StreamLineSplitter(rowDelimiter,
skipHeaderNumber, lineProcessor);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
index ca9a60e979..1dafa51b7e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
@@ -65,7 +65,9 @@ public class AccordingToSplitSizeSplitStrategy implements
FileSplitStrategy, Clo
if (splitSize <= 0) {
throw new SeaTunnelRuntimeException(
FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
- "SplitSizeBytes must be greater than 0");
+ String.format(
+ "file_split_size must be greater than 0 when
enable_file_split=true, but got: %d",
+ splitSize));
}
if (rowDelimiter == null || rowDelimiter.isEmpty()) {
throw new SeaTunnelRuntimeException(
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactory.java
index a801927e9d..c0d6bb42e5 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactory.java
@@ -17,16 +17,21 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.split;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+
+import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions.DEFAULT_ROW_DELIMITER;
+@Slf4j
public class FileSplitStrategyFactory {
public static FileSplitStrategy initFileSplitStrategy(
@@ -34,12 +39,24 @@ public class FileSplitStrategyFactory {
if (!readonlyConfig.get(FileBaseSourceOptions.ENABLE_FILE_SPLIT)) {
return new DefaultFileSplitStrategy();
}
- if
(!readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE).supportFileSplit())
{
+ FileFormat fileFormat =
readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE);
+ if (!fileFormat.supportFileSplit()) {
+ log.warn(
+ "enable_file_split=true but file_format_type={} does not
support file split. "
+ + "Falling back to non-splitting mode.",
+ fileFormat);
return new DefaultFileSplitStrategy();
}
- if (readonlyConfig.get(FileBaseSourceOptions.COMPRESS_CODEC) !=
CompressFormat.NONE
- ||
readonlyConfig.get(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC)
- != ArchiveCompressFormat.NONE) {
+ CompressFormat compressCodec =
readonlyConfig.get(FileBaseSourceOptions.COMPRESS_CODEC);
+ ArchiveCompressFormat archiveCompressCodec =
+
readonlyConfig.get(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC);
+ if (compressCodec != CompressFormat.NONE
+ || archiveCompressCodec != ArchiveCompressFormat.NONE) {
+ log.warn(
+ "enable_file_split=true but compress_codec={} or
archive_compress_codec={} is not NONE. "
+ + "Falling back to non-splitting mode.",
+ compressCodec,
+ archiveCompressCodec);
return new DefaultFileSplitStrategy();
}
@@ -47,7 +64,14 @@ public class FileSplitStrategyFactory {
hadoopConf, "hadoopConf must not be null when file split is
enabled");
long fileSplitSize =
readonlyConfig.get(FileBaseSourceOptions.FILE_SPLIT_SIZE);
- if (FileFormat.PARQUET ==
readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE)) {
+ if (fileSplitSize <= 0) {
+ throw new SeaTunnelRuntimeException(
+ FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
+ String.format(
+ "file_split_size must be greater than 0 when
enable_file_split=true, but got: %d",
+ fileSplitSize));
+ }
+ if (FileFormat.PARQUET == fileFormat) {
return new ParquetFileSplitStrategy(fileSplitSize, hadoopConf);
}
String rowDelimiter =
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
index 93a12d3213..b89bddfa53 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
@@ -58,7 +58,9 @@ public class ParquetFileSplitStrategy implements
FileSplitStrategy, Closeable {
if (splitSizeBytes <= 0) {
throw new SeaTunnelRuntimeException(
FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
- "SplitSizeBytes must be greater than 0");
+ String.format(
+ "file_split_size must be greater than 0 when
enable_file_split=true, but got: %d",
+ splitSizeBytes));
}
this.splitSizeBytes = splitSizeBytes;
this.hadoopFileSystemProxy = null;
@@ -68,7 +70,9 @@ public class ParquetFileSplitStrategy implements
FileSplitStrategy, Closeable {
if (splitSizeBytes <= 0) {
throw new SeaTunnelRuntimeException(
FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
- "SplitSizeBytes must be greater than 0");
+ String.format(
+ "file_split_size must be greater than 0 when
enable_file_split=true, but got: %d",
+ splitSizeBytes));
}
this.splitSizeBytes = splitSizeBytes;
this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
@@ -79,7 +83,12 @@ public class ParquetFileSplitStrategy implements
FileSplitStrategy, Closeable {
try {
return splitByRowGroups(tableId, filePath,
readRowGroups(filePath));
} catch (IOException e) {
- throw new
SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_SPLIT_FAIL, e);
+ throw new SeaTunnelRuntimeException(
+ FileConnectorErrorCode.FILE_SPLIT_FAIL,
+ String.format(
+ "Split parquet file for [%s] failed, cause=%s: %s",
+ filePath, e.getClass().getSimpleName(),
e.getMessage()),
+ e);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategySplitFallbackTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategySplitFallbackTest.java
new file mode 100644
index 0000000000..c791442f4a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategySplitFallbackTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ReadStrategySplitFallbackTest {
+
+ private static final class ListCollector implements
Collector<SeaTunnelRow> {
+ private final List<SeaTunnelRow> rows;
+ private final Object checkpointLock = new Object();
+
+ private ListCollector(List<SeaTunnelRow> rows) {
+ this.rows = rows;
+ }
+
+ @Override
+ public void collect(SeaTunnelRow record) {
+ rows.add(record);
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return checkpointLock;
+ }
+ }
+
+ @Test
+ void
testTextReadStrategyShouldSkipHeaderWhenEnableSplitButNoRangeInSplit() throws
Exception {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FileBaseSourceOptions.FILE_PATH.key(), "/tmp/test");
+ configMap.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ configMap.put(FileBaseSourceOptions.SKIP_HEADER_ROW_NUMBER.key(), 1L);
+ Config pluginConfig = ConfigFactory.parseMap(configMap);
+
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"name"}, new SeaTunnelDataType[]
{BasicType.STRING_TYPE});
+ CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("test",
rowType);
+
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ ListCollector collector = new ListCollector(rows);
+ FileSourceSplit split = new FileSourceSplit("test",
"/tmp/test/e2e.txt");
+
+ try (TextReadStrategy strategy = new TextReadStrategy()) {
+ strategy.setPluginConfig(pluginConfig);
+ strategy.setCatalogTable(catalogTable);
+
+ strategy.readProcess(
+ split,
+ collector,
+ new
ByteArrayInputStream("name\na\n".getBytes(StandardCharsets.UTF_8)),
+ Collections.emptyMap(),
+ "e2e.txt");
+ }
+
+ Assertions.assertEquals(1, rows.size());
+ Assertions.assertEquals("a", rows.get(0).getField(0));
+ }
+
+ @Test
+ void testCsvReadStrategyShouldUseHeaderWhenEnableSplitButNoRangeInSplit()
throws Exception {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FileBaseSourceOptions.FILE_PATH.key(), "/tmp/test");
+ configMap.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ configMap.put(FileBaseSourceOptions.CSV_USE_HEADER_LINE.key(), true);
+ Config pluginConfig = ConfigFactory.parseMap(configMap);
+
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"id", "name"},
+ new SeaTunnelDataType[] {BasicType.INT_TYPE,
BasicType.STRING_TYPE});
+ CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("test",
rowType);
+
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ ListCollector collector = new ListCollector(rows);
+ FileSourceSplit split = new FileSourceSplit("test",
"/tmp/test/e2e.csv");
+
+ try (CsvReadStrategy strategy = new CsvReadStrategy()) {
+ strategy.setPluginConfig(pluginConfig);
+ strategy.setCatalogTable(catalogTable);
+
+ strategy.readProcess(
+ split,
+ collector,
+ new
ByteArrayInputStream("id,name\n1,a\n".getBytes(StandardCharsets.UTF_8)),
+ Collections.emptyMap(),
+ "e2e.csv");
+ }
+
+ Assertions.assertEquals(1, rows.size());
+ Assertions.assertEquals(1, rows.get(0).getField(0));
+ Assertions.assertEquals("a", rows.get(0).getField(1));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactoryTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactoryTest.java
new file mode 100644
index 0000000000..7fed69435a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactoryTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class FileSplitStrategyFactoryTest {
+
+ @Test
+ void shouldThrowWhenSplitSizeIsNonPositive() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ configMap.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.TEXT);
+ configMap.put(FileBaseSourceOptions.COMPRESS_CODEC.key(),
CompressFormat.NONE);
+ configMap.put(
+ FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC.key(),
ArchiveCompressFormat.NONE);
+ configMap.put(FileBaseSourceOptions.FILE_SPLIT_SIZE.key(), 0L);
+
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);
+ HadoopConf hadoopConf = new HadoopConf("file:///");
+
+ SeaTunnelRuntimeException exception =
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class,
+ () ->
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ readonlyConfig, hadoopConf));
+ Assertions.assertEquals(
+ FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
exception.getSeaTunnelErrorCode());
+
Assertions.assertTrue(exception.getMessage().contains("file_split_size"));
+ }
+
+ @Test
+ void shouldFallbackToDefaultWhenCompressed() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ configMap.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.TEXT);
+ configMap.put(FileBaseSourceOptions.COMPRESS_CODEC.key(),
CompressFormat.LZO);
+ configMap.put(FileBaseSourceOptions.FILE_SPLIT_SIZE.key(), 0L);
+
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);
+
+ FileSplitStrategy strategy =
+ FileSplitStrategyFactory.initFileSplitStrategy(readonlyConfig,
null);
+ Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, strategy);
+ }
+
+ @Test
+ void shouldFallbackToDefaultWhenFormatNotSupportSplit() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ configMap.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.ORC);
+ configMap.put(FileBaseSourceOptions.FILE_SPLIT_SIZE.key(), 0L);
+
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);
+
+ FileSplitStrategy strategy =
+ FileSplitStrategyFactory.initFileSplitStrategy(readonlyConfig,
null);
+ Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, strategy);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
index 8663d95dfd..4e64c0c0b7 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
@@ -28,7 +28,11 @@ import java.util.List;
public class S3FileSource extends BaseMultipleTableFileSource {
public S3FileSource(ReadonlyConfig readonlyConfig, List<CatalogTable>
catalogTablesFromConfig) {
- super(new MultipleTableS3FileSourceConfig(readonlyConfig,
catalogTablesFromConfig));
+ this(new MultipleTableS3FileSourceConfig(readonlyConfig,
catalogTablesFromConfig));
+ }
+
+ private S3FileSource(MultipleTableS3FileSourceConfig sourceConfig) {
+ super(sourceConfig, initFileSplitStrategy(sourceConfig));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
index 66af2fd81d..303ec74d39 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
@@ -93,6 +93,18 @@ public class S3FileSourceFactory implements
TableSourceFactory {
Arrays.asList(
FileFormat.TEXT, FileFormat.JSON,
FileFormat.CSV, FileFormat.XML),
FileBaseSourceOptions.ENCODING)
+ .conditional(
+ FileBaseSourceOptions.FILE_FORMAT_TYPE,
+ Arrays.asList(
+ FileFormat.TEXT,
+ FileFormat.JSON,
+ FileFormat.CSV,
+ FileFormat.PARQUET),
+ FileBaseSourceOptions.ENABLE_FILE_SPLIT)
+ .conditional(
+ FileBaseSourceOptions.ENABLE_FILE_SPLIT,
+ Boolean.TRUE,
+ FileBaseSourceOptions.FILE_SPLIT_SIZE)
.optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
.optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
.optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/s3/S3FileFactoryTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/s3/S3FileFactoryTest.java
index f52a16e264..393f8dc7c3 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/s3/S3FileFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/s3/S3FileFactoryTest.java
@@ -17,6 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.file.s3;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.util.Condition;
+import org.apache.seatunnel.api.configuration.util.Expression;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.file.s3.sink.S3FileSinkFactory;
import
org.apache.seatunnel.connectors.seatunnel.file.s3.source.S3FileSourceFactory;
@@ -30,4 +36,69 @@ class S3FileFactoryTest {
Assertions.assertNotNull((new S3FileSourceFactory()).optionRule());
Assertions.assertNotNull((new S3FileSinkFactory()).optionRule());
}
+
+ @Test
+ void sourceOptionRuleShouldContainFileSplitOptions() {
+ OptionRule rule = new S3FileSourceFactory().optionRule();
+ Assertions.assertTrue(
+ optionRuleContains(rule,
FileBaseSourceOptions.ENABLE_FILE_SPLIT),
+ "S3File source optionRule should include enable_file_split");
+ Assertions.assertTrue(
+ optionRuleContains(rule,
FileBaseSourceOptions.FILE_SPLIT_SIZE),
+ "S3File source optionRule should include file_split_size");
+
+ Assertions.assertTrue(
+ hasConditionalRequiredOption(
+ rule,
+ FileBaseSourceOptions.FILE_FORMAT_TYPE,
+ FileBaseSourceOptions.ENABLE_FILE_SPLIT),
+ "S3File source optionRule should expose enable_file_split for
split-capable formats");
+
+ Assertions.assertTrue(
+ hasConditionalRequiredOption(
+ rule,
+ FileBaseSourceOptions.ENABLE_FILE_SPLIT,
+ FileBaseSourceOptions.FILE_SPLIT_SIZE),
+ "S3File source optionRule should expose file_split_size when
enable_file_split=true");
+ }
+
+ private static boolean optionRuleContains(OptionRule rule, Option<?>
option) {
+ if (rule.getOptionalOptions().contains(option)) {
+ return true;
+ }
+ return rule.getRequiredOptions().stream().anyMatch(ro ->
ro.getOptions().contains(option));
+ }
+
+ private static boolean hasConditionalRequiredOption(
+ OptionRule rule, Option<?> conditionalOption, Option<?>
requiredOption) {
+ return rule.getRequiredOptions().stream()
+ .filter(ro -> ro instanceof
RequiredOption.ConditionalRequiredOptions)
+ .map(ro -> (RequiredOption.ConditionalRequiredOptions) ro)
+ .anyMatch(
+ cro ->
+ expressionContainsOption(cro.getExpression(),
conditionalOption)
+ &&
cro.getRequiredOption().contains(requiredOption));
+ }
+
+ private static boolean expressionContainsOption(Expression expression,
Option<?> option) {
+ Expression currentExpression = expression;
+ while (currentExpression != null) {
+ if (conditionContainsOption(currentExpression.getCondition(),
option)) {
+ return true;
+ }
+ currentExpression = currentExpression.getNext();
+ }
+ return false;
+ }
+
+ private static boolean conditionContainsOption(Condition<?> condition,
Option<?> option) {
+ Condition<?> currentCondition = condition;
+ while (currentCondition != null) {
+ if (currentCondition.getOption().equals(option)) {
+ return true;
+ }
+ currentCondition = currentCondition.getNext();
+ }
+ return false;
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithFilterIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithFilterIT.java
index 546baf06a3..5f5e587aa6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithFilterIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithFilterIT.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.e2e.connector.file.s3;
import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -38,6 +39,14 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.file.Paths;
+/**
+ * MinIO-based S3 E2E test suite for connector-file-s3, covering:
+ *
+ * <ul>
+ * <li>file filter by path/name pattern
+ * <li>logical file split (enable_file_split/file_split_size) for parallel
read
+ * </ul>
+ */
@Slf4j
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class S3FileWithFilterIT extends SeaTunnelContainer {
@@ -79,7 +88,9 @@ public class S3FileWithFilterIT extends SeaTunnelContainer {
}
@Override
+ @AfterAll
public void tearDown() throws Exception {
+ super.tearDown();
if (s3Container != null) {
s3Container.close();
}
@@ -143,4 +154,15 @@ public class S3FileWithFilterIT extends SeaTunnelContainer
{
executeJob("/json/s3_to_access_for_json_name_filter.conf");
Assertions.assertEquals(0, execNameResult.getExitCode());
}
+
+ @Test
+ public void testS3FileTextEnableSplitToAssert() throws IOException,
InterruptedException {
+ S3Utils.uploadTestFiles(
+ "/text/e2e_split_with_header.txt",
+ "/test/seatunnel/read/split/text/e2e_split_with_header.txt",
+ true);
+ Container.ExecResult execResult =
+ executeJob("/text/s3_file_text_enable_split_to_assert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/text/e2e_split_with_header.txt
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/text/e2e_split_with_header.txt
new file mode 100644
index 0000000000..6c9c22a326
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/text/e2e_split_with_header.txt
@@ -0,0 +1,6 @@
+name
+a
+b
+c
+d
+e
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/text/s3_file_text_enable_split_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/text/s3_file_text_enable_split_to_assert.conf
new file mode 100644
index 0000000000..5842447fb7
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/text/s3_file_text_enable_split_to_assert.conf
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 2
+ job.mode = "BATCH"
+
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = "local"
+}
+
+source {
+ S3File {
+ fs.s3a.endpoint = "http://s3:9000"
+ hadoop_s3_properties = {
+ "fs.s3a.path.style.access" = "true"
+ "fs.s3a.statistics.enable" = "false"
+ }
+ fs.s3a.aws.credentials.provider =
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
+ access_key = "minioadmin"
+ secret_key = "minioadmin"
+ bucket = "s3a://ws-package"
+ path = "/test/seatunnel/read/split/text"
+ file_format_type = "text"
+
+ enable_file_split = true
+ file_split_size = 5
+
+ skip_header_row_number = 1
+ schema = {
+ fields {
+ name = string
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules = {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 5
+ }
+ ]
+ field_rules = [
+ {
+ field_name = "name"
+ field_type = "string"
+ field_value = [
+ { rule_type = NOT_NULL }
+ ]
+ }
+ ]
+ }
+ }
+}