This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4c110c8fdf [Feature][Connector-V2][HdfsFile] Support true large-file
split for parallel read (#10332)
4c110c8fdf is described below
commit 4c110c8fdfe740de3c22bf1cd0036eca7aab7d76
Author: yzeng1618 <[email protected]>
AuthorDate: Thu Jan 29 17:48:27 2026 +0800
[Feature][Connector-V2][HdfsFile] Support true large-file split for
parallel read (#10332)
Co-authored-by: zengyi <[email protected]>
---
docs/en/connectors/source/HdfsFile.md | 26 +++
docs/en/connectors/source/LocalFile.md | 15 +-
docs/zh/connectors/source/HdfsFile.md | 26 +++
docs/zh/connectors/source/LocalFile.md | 13 ++
.../file/exception/FileConnectorErrorCode.java | 5 +-
.../file/source/BaseMultipleTableFileSource.java | 24 +-
.../file/source/reader/AbstractReadStrategy.java | 22 +-
.../split/AccordingToSplitSizeSplitStrategy.java | 242 +++++++++++++++------
.../file/source/split/FileSourceSplit.java | 14 ++
.../source/split/FileSplitStrategyFactory.java} | 23 +-
.../MultipleTableFileSourceSplitEnumerator.java | 68 +++++-
.../split/MultipleTableFileSplitStrategy.java | 72 ++++++
.../source/split/ParquetFileSplitStrategy.java | 74 +++++--
.../source/reader/AbstractReadStrategyTest.java | 81 +++++++
.../split/FileSourceSplitCompatibilityTest.java | 209 ++++++++++++++++++
.../seatunnel/file/hdfs/source/HdfsFileSource.java | 6 +-
.../file/hdfs/source/HdfsFileSourceFactory.java | 12 +
...sFileAccordingToSplitSizeSplitStrategyTest.java | 229 +++++++++++++++++++
.../split/HdfsFileSplitStrategyFactoryTest.java | 123 +++++++++++
.../file/local/source/LocalFileSource.java | 9 +-
...LocalFileAccordingToSplitSizeSplitStrategy.java | 47 ++--
.../seatunnel/file/local/LocalFileSourceTest.java | 61 ++++--
.../file/local/SplitFileStrategyTest.java | 141 ++++++------
.../e2e/connector/file/hdfs/HdfsFileIT.java | 41 ++++
.../resources/hdfs_parquet_split_to_assert.conf | 84 +++++++
.../test/resources/hdfs_text_split_to_assert.conf | 58 +++++
26 files changed, 1503 insertions(+), 222 deletions(-)
diff --git a/docs/en/connectors/source/HdfsFile.md
b/docs/en/connectors/source/HdfsFile.md
index 27e50e40f6..17e9285317 100644
--- a/docs/en/connectors/source/HdfsFile.md
+++ b/docs/en/connectors/source/HdfsFile.md
@@ -88,6 +88,8 @@ Read data from hdfs file system.
| common-options | | no | -
| Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details.
|
| file_filter_modified_start | string | no | -
| File modification time filter. The connector will filter some files base on
the last modification start time (include start time). The default data format
is `yyyy-MM-dd HH:mm:ss`.
|
| file_filter_modified_end | string | no | -
| File modification time filter. The connector will filter some files base on
the last modification end time (not include end time). The default data format
is `yyyy-MM-dd HH:mm:ss`.
|
+| enable_file_split | boolean | no | false
| Turn on logical file split to improve parallelism for huge files. Only
supported for `text`/`csv`/`json`/`parquet` and non-compressed format.
|
+| file_split_size | long | no | 134217728
| Split size in bytes when `enable_file_split=true`. For `text`/`csv`/`json`,
the split end will be aligned to the next `row_delimiter`. For `parquet`, the
split unit is RowGroup and will never break a RowGroup.
|
| quote_char | string | no | "
| A single character that encloses CSV fields, allowing fields with commas,
line breaks, or quotes to be read correctly.
|
| escape_char | string | no | -
| A single character that allows the quote or other special characters to
appear inside a CSV field without ending the field.
|
@@ -254,6 +256,30 @@ Only used when `sync_mode=update`. Supported values:
`len_mtime` (default), `che
- `len_mtime`: SKIP only when both `len` and `mtime` are equal, otherwise COPY.
- `checksum`: SKIP only when `len` is equal and Hadoop `getFileChecksum` is
equal, otherwise COPY (only valid when `update_strategy=strict`).
+
+### enable_file_split [boolean]
+
+Turn on the file splitting function, the default is false. It can be selected
when the file type is csv, text, json, parquet and non-compressed format.
+
+- `text`/`csv`/`json`: split by `file_split_size` and align to the next
`row_delimiter` to avoid breaking records.
+- `parquet`: split by RowGroup (logical split), never breaks a RowGroup.
+
+**Recommendations**
+- Enable when reading a few large files and you want higher read parallelism.
+- Disable when reading many small files, or when parallelism is low (splitting
adds overhead).
+
+**Limitations**
+- Not supported for compressed files (`compress_codec` != `none`) or archive
files (`archive_compress_codec` != `none`) — it will fall back to non-splitting.
+- For `text`/`csv`/`json`, actual split size may be larger than
`file_split_size` because the split end is aligned to the next `row_delimiter`.
+
+### file_split_size [long]
+
+File split size, which can be filled in when the enable_file_split parameter
is true. The unit is the number of bytes. The default value is the number of
bytes of 128MB, which is 134217728.
+
+**Tuning**
+- Start with the default (128MB). Decrease it if parallelism is
under-utilized; increase it if the number of splits is too large.
+- Rough rule: `file_split_size ≈ file_size / desired_parallelism`.
+
### quote_char [string]
A single character that encloses CSV fields, allowing fields with commas, line
breaks, or quotes to be read correctly.
diff --git a/docs/en/connectors/source/LocalFile.md
b/docs/en/connectors/source/LocalFile.md
index 5dcfad7cfa..72c01544c3 100644
--- a/docs/en/connectors/source/LocalFile.md
+++ b/docs/en/connectors/source/LocalFile.md
@@ -418,14 +418,27 @@ File modification time filter. The connector will filter
some files base on the
File modification time filter. The connector will filter some files base on
the last modification end time (not include end time). The default data format
is `yyyy-MM-dd HH:mm:ss`.
-### enable_file_split [string]
+### enable_file_split [boolean]
Turn on the file splitting function, the default is false.It can be selected
when the file type is csv, text, json, parquet and non-compressed format.
+**Recommendations**
+- Enable when reading a few large files and you want higher read parallelism.
+- Disable when reading many small files, or when parallelism is low (splitting
adds overhead).
+
+**Limitations**
+- Not supported for compressed files (`compress_codec` != `none`) or archive
files (`archive_compress_codec` != `none`) — it will fall back to non-splitting.
+- For `text`/`csv`/`json`, actual split size may be larger than
`file_split_size` because the split end is aligned to the next `row_delimiter`.
+- LocalFile uses Hadoop LocalFileSystem internally; no extra Hadoop
configuration is required.
+
### file_split_size [long]
File split size, which can be filled in when the enable_file_split parameter
is true. The unit is the number of bytes. The default value is the number of
bytes of 128MB, which is 134217728.
+**Tuning**
+- Start with the default (128MB). Decrease it if parallelism is
under-utilized; increase it if the number of splits is too large.
+- Rough rule: `file_split_size ≈ file_size / desired_parallelism`.
+
### quote_char [string]
A single character that encloses CSV fields, allowing fields with commas, line
breaks, or quotes to be read correctly.
diff --git a/docs/zh/connectors/source/HdfsFile.md
b/docs/zh/connectors/source/HdfsFile.md
index 811059e6c5..d17b8da333 100644
--- a/docs/zh/connectors/source/HdfsFile.md
+++ b/docs/zh/connectors/source/HdfsFile.md
@@ -88,6 +88,8 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
| common-options | | 否 | - |
数据源插件通用参数,请参阅 [数据源通用选项](../source-common-options.md) 了解详情。
|
| file_filter_modified_start | string | 否 | - |
按照最后修改时间过滤文件。 要过滤的开始时间(包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`
|
| file_filter_modified_end | string | 否 | - |
按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`
|
+| enable_file_split | boolean | 否 | false |
开启大文件拆分以提升并行度。仅支持 `text`/`csv`/`json`/`parquet` 且非压缩格式(`compress_codec=none` 且
`archive_compress_codec=none`)。
|
+| file_split_size | long | 否 | 134217728 |
`enable_file_split=true` 时生效,单位字节。`text`/`csv`/`json` 按 `file_split_size`
拆分并对齐到下一个 `row_delimiter`;`parquet` 以 RowGroup 为拆分单位,不会切开 RowGroup。
|
| quote_char | string | 否 | " | 用于包裹 CSV
字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。
|
| escape_char | string | 否 | - | 用于在 CSV
字段内转义引号或其他特殊字符,使其不会结束字段。
|
@@ -255,6 +257,30 @@ abc.*
- `len_mtime`:`len` 与 `mtime` 都相同才 SKIP,否则 COPY。
- `checksum`:要求 `len` 相同且 Hadoop `getFileChecksum` 相同才 SKIP,否则 COPY(仅在
`update_strategy=strict` 时生效)。
+
+### enable_file_split [boolean]
+
+开启大文件拆分功能,默认 false。仅支持 `csv`/`text`/`json`/`parquet`
且非压缩格式(`compress_codec=none` 且 `archive_compress_codec=none`)。
+
+- `text`/`csv`/`json`:按 `file_split_size` 拆分并对齐到下一个
`row_delimiter`,避免切开一行/一条记录。
+- `parquet`:以 RowGroup 为逻辑拆分单位,不会切开 RowGroup。
+
+**使用建议**
+- 适合:读取少量大文件,并希望通过更高并行度提升吞吐。
+- 不建议:读取大量小文件,或并行度较低的场景(拆分会带来额外的枚举/调度开销)。
+
+**限制说明**
+- 不支持压缩文件(`compress_codec` != `none`)或归档文件(`archive_compress_codec` !=
`none`),会自动回退为不拆分。
+- 对于 `text`/`csv`/`json`,实际 split 的大小可能略大于 `file_split_size`(因为需要对齐到下一个
`row_delimiter`)。
+
+### file_split_size [long]
+
+`enable_file_split=true` 时生效,单位字节。默认 128MB(134217728)。
+
+**调优建议**
+- 建议从默认值(128MB)开始:如果并行度未充分利用可适当调小;如果 split 数量过多可适当调大。
+- 经验公式:`file_split_size ≈ file_size / 期望并行度`。
+
### quote_char [string]
用于包裹 CSV 字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。
diff --git a/docs/zh/connectors/source/LocalFile.md
b/docs/zh/connectors/source/LocalFile.md
index 0453b09083..ef6dabed25 100644
--- a/docs/zh/connectors/source/LocalFile.md
+++ b/docs/zh/connectors/source/LocalFile.md
@@ -423,10 +423,23 @@ null_format 定义哪些字符串可以表示为 null。
开启文件分割功能,默认为false。文件类型为csv、text、json、parquet非压缩格式时可选择。
+**使用建议**
+- 适合:读取少量大文件,并希望通过更高并行度提升吞吐。
+- 不建议:读取大量小文件,或并行度较低的场景(拆分会带来额外的枚举/调度开销)。
+
+**限制说明**
+- 不支持压缩文件(`compress_codec` != `none`)或归档文件(`archive_compress_codec` !=
`none`),会自动回退为不拆分。
+- 对于 `text`/`csv`/`json`,实际 split 的大小可能略大于 `file_split_size`(因为需要对齐到下一个
`row_delimiter`)。
+- LocalFile 内部使用 Hadoop LocalFileSystem(`file:///`),通常不需要额外 Hadoop 配置。
+
### file_split_size [long]
文件分割大小,enable_file_split参数为true时可以填写。单位是字节数。默认值为128MB的字节数,即134217728。
+**调优建议**
+- 建议从默认值(128MB)开始:如果并行度未充分利用可适当调小;如果 split 数量过多可适当调大。
+- 经验公式:`file_split_size ≈ file_size / 期望并行度`。
+
### quote_char [string]
用于包裹 CSV 字段的单字符,可保证包含逗号、换行符或引号的字段被正确解析。
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
index db30dfcb8f..23b3ea5fb3 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
@@ -30,7 +30,10 @@ public enum FileConnectorErrorCode implements
SeaTunnelErrorCode {
FILE_READ_FAILED("FILE-08", "File read failed"),
BINARY_FILE_PART_ORDER_ERROR("FILE-09", "Binary file fragment order
abnormality"),
FILE_SPLIT_SIZE_ILLEGAL("FILE-10", "SplitSizeBytes must be greater than
0"),
- FILE_SPLIT_FAIL("FILE-11", "File split fail");
+ FILE_SPLIT_FAIL("FILE-11", "File split fail"),
+ FILE_NOT_FOUND("FILE-12", "File not found"),
+ FILE_ACCESS_DENIED("FILE-13", "File access denied"),
+ FILE_IO_TIMEOUT("FILE-14", "File IO timeout");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseMultipleTableFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseMultipleTableFileSource.java
index 6626ccd2a9..5b7c91913f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseMultipleTableFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseMultipleTableFileSource.java
@@ -31,10 +31,14 @@ import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.MultipleTabl
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategyFactory;
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.MultipleTableFileSourceSplitEnumerator;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.MultipleTableFileSplitStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
public abstract class BaseMultipleTableFileSource
@@ -58,6 +62,21 @@ public abstract class BaseMultipleTableFileSource
this.fileSplitStrategy = fileSplitStrategy;
}
+ protected static FileSplitStrategy initFileSplitStrategy(
+ BaseMultipleTableFileSourceConfig sourceConfig) {
+ Map<String, FileSplitStrategy> splitStrategies = new HashMap<>();
+ for (BaseFileSourceConfig fileSourceConfig :
sourceConfig.getFileSourceConfigs()) {
+ String tableId =
+
fileSourceConfig.getCatalogTable().getTableId().toTablePath().toString();
+ splitStrategies.put(
+ tableId,
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ fileSourceConfig.getBaseFileSourceConfig(),
+ fileSourceConfig.getHadoopConfig()));
+ }
+ return new MultipleTableFileSplitStrategy(splitStrategies);
+ }
+
@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
@@ -91,6 +110,9 @@ public abstract class BaseMultipleTableFileSource
SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext,
FileSourceState checkpointState) {
return new MultipleTableFileSourceSplitEnumerator(
- enumeratorContext, baseMultipleTableFileSourceConfig,
checkpointState);
+ enumeratorContext,
+ baseMultipleTableFileSourceConfig,
+ fileSplitStrategy,
+ checkpointState);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index e64eee5795..80b88cd170 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -50,6 +50,7 @@ import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
import lombok.extern.slf4j.Slf4j;
@@ -522,13 +523,22 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
protected static InputStream safeSlice(InputStream in, long start, long
length)
throws IOException {
- long toSkip = start;
- while (toSkip > 0) {
- long skipped = in.skip(toSkip);
- if (skipped <= 0) {
- throw new SeaTunnelException("skipped error");
+ if (start > 0) {
+ if (in instanceof Seekable) {
+ ((Seekable) in).seek(start);
+ } else {
+ long toSkip = start;
+ while (toSkip > 0) {
+ long skipped = in.skip(toSkip);
+ if (skipped <= 0) {
+ throw new SeaTunnelException("skipped error");
+ }
+ toSkip -= skipped;
+ }
}
- toSkip -= skipped;
+ }
+ if (length < 0) {
+ return in;
}
return new BoundedInputStream(in, length);
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
index 11117fef88..ca9a60e979 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
@@ -17,88 +17,182 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.split;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.AccessControlException;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
import java.nio.charset.Charset;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
-public abstract class AccordingToSplitSizeSplitStrategy implements
FileSplitStrategy {
+/**
+ * {@link AccordingToSplitSizeSplitStrategy} defines a split strategy for
text-like files by using
+ * {@code rowDelimiter} as the minimum indivisible unit and generating {@link
FileSourceSplit}s by
+ * merging one or more contiguous rows according to the configured split size.
+ *
+ * <p>This strategy will never break a row delimiter, ensuring each split
starts at a row boundary.
+ *
+ * <p>To avoid scanning the whole file for large files, this strategy uses
{@link FSDataInputStream}
+ * seek to locate the next delimiter around each split boundary.
+ */
+public class AccordingToSplitSizeSplitStrategy implements FileSplitStrategy,
Closeable {
+ private static final int BUFFER_SIZE = 64 * 1024;
+
+ private final HadoopFileSystemProxy hadoopFileSystemProxy;
private final long skipHeaderRowNumber;
private final long splitSize;
private final byte[] delimiterBytes;
- private static final int BUFFER_SIZE = 64 * 1024;
public AccordingToSplitSizeSplitStrategy(
- String rowDelimiter, long skipHeaderRowNumber, String
encodingName, long splitSize) {
+ HadoopConf hadoopConf,
+ String rowDelimiter,
+ long skipHeaderRowNumber,
+ String encodingName,
+ long splitSize) {
+ if (splitSize <= 0) {
+ throw new SeaTunnelRuntimeException(
+ FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
+ "SplitSizeBytes must be greater than 0");
+ }
+ if (rowDelimiter == null || rowDelimiter.isEmpty()) {
+ throw new SeaTunnelRuntimeException(
+ FileConnectorErrorCode.FILE_SPLIT_FAIL, "rowDelimiter must
not be empty");
+ }
+ this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
this.skipHeaderRowNumber = skipHeaderRowNumber;
this.splitSize = splitSize;
this.delimiterBytes =
rowDelimiter.getBytes(Charset.forName(encodingName));
+ if (delimiterBytes.length == 0) {
+ throw new SeaTunnelRuntimeException(
+ FileConnectorErrorCode.FILE_SPLIT_FAIL,
+ "rowDelimiter must not be empty after encoding");
+ }
}
@Override
public List<FileSourceSplit> split(String tableId, String filePath) {
+ String normalizedPath = normalizePath(filePath);
List<FileSourceSplit> splits = new ArrayList<>();
- long fileSize = safeGetFileSize(filePath);
+ long fileSize = safeGetFileSize(normalizedPath);
if (fileSize == 0) {
return splits;
}
- long currentStart = 0;
- if (skipHeaderRowNumber > 0) {
- currentStart = skipHeaderWithBuffer(filePath, skipHeaderRowNumber);
- }
- while (currentStart < fileSize) {
- long tentativeEnd = currentStart + splitSize;
- if (tentativeEnd >= fileSize) {
+ try (FSDataInputStream input =
hadoopFileSystemProxy.getInputStream(normalizedPath)) {
+ long currentStart = 0;
+ if (skipHeaderRowNumber > 0) {
+ currentStart = skipLinesUsingBuffer(input,
skipHeaderRowNumber);
+ }
+ while (currentStart < fileSize) {
+ long tentativeEnd = currentStart + splitSize;
+ if (tentativeEnd >= fileSize) {
+ splits.add(
+ new FileSourceSplit(
+ tableId,
+ normalizedPath,
+ currentStart,
+ fileSize - currentStart));
+ break;
+ }
+ long actualEnd = findNextDelimiterWithSeek(input,
tentativeEnd, fileSize);
+ if (actualEnd <= currentStart) {
+ actualEnd = tentativeEnd;
+ }
splits.add(
new FileSourceSplit(
- tableId, filePath, currentStart, fileSize -
currentStart));
- break;
- }
- long actualEnd = findNextDelimiterWithBuffer(filePath,
tentativeEnd);
- if (actualEnd <= currentStart) {
- actualEnd = tentativeEnd;
+ tableId, normalizedPath, currentStart,
actualEnd - currentStart));
+ currentStart = actualEnd;
}
- splits.add(
- new FileSourceSplit(tableId, filePath, currentStart,
actualEnd - currentStart));
- currentStart = actualEnd;
+ return splits;
+ } catch (IOException e) {
+ throw mapToRuntimeException(normalizedPath, "Split file", e);
}
- return splits;
}
- protected abstract InputStream getInputStream(String filePath) throws
IOException;
-
- protected abstract long getFileSize(String filePath) throws IOException;
-
private long safeGetFileSize(String filePath) {
try {
- return getFileSize(filePath);
+ return hadoopFileSystemProxy.getFileStatus(filePath).getLen();
} catch (IOException e) {
- throw new
SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_READ_FAILED, e);
+ throw mapToRuntimeException(filePath, "Get file status", e);
}
}
- private long skipHeaderWithBuffer(String filePath, long skipLines) {
- try (InputStream input = getInputStream(filePath)) {
- return skipLinesUsingBuffer(input, skipLines);
- } catch (IOException e) {
- throw new
SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_READ_FAILED, e);
+ private static SeaTunnelRuntimeException mapToRuntimeException(
+ String filePath, String operation, IOException e) {
+ IOException unwrapped = unwrapRemoteException(e);
+ FileConnectorErrorCode errorCode =
mapIOExceptionToErrorCode(unwrapped);
+ String message =
+ String.format(
+ "%s for [%s] failed, cause=%s: %s",
+ operation,
+ filePath,
+ unwrapped.getClass().getSimpleName(),
+ unwrapped.getMessage());
+ return new SeaTunnelRuntimeException(errorCode, message, unwrapped);
+ }
+
+ private static FileConnectorErrorCode
mapIOExceptionToErrorCode(IOException e) {
+ if (hasCause(e, FileNotFoundException.class) || hasCause(e,
NoSuchFileException.class)) {
+ return FileConnectorErrorCode.FILE_NOT_FOUND;
+ }
+ if (hasCause(e, AccessDeniedException.class) || hasCause(e,
AccessControlException.class)) {
+ return FileConnectorErrorCode.FILE_ACCESS_DENIED;
+ }
+ if (hasCause(e, SocketTimeoutException.class)
+ || hasCause(e, InterruptedIOException.class)) {
+ return FileConnectorErrorCode.FILE_IO_TIMEOUT;
+ }
+ return FileConnectorErrorCode.FILE_READ_FAILED;
+ }
+
+ private static boolean hasCause(Throwable throwable, Class<? extends
Throwable> type) {
+ Throwable current = throwable;
+ while (current != null) {
+ if (type.isInstance(current)) {
+ return true;
+ }
+ current = current.getCause();
}
+ return false;
}
- private long skipLinesUsingBuffer(InputStream is, long skipLines) throws
IOException {
+ private static IOException unwrapRemoteException(IOException e) {
+ if (e instanceof RemoteException) {
+ return ((RemoteException) e)
+ .unwrapRemoteException(
+ FileNotFoundException.class,
+ NoSuchFileException.class,
+ AccessControlException.class,
+ AccessDeniedException.class,
+ SocketTimeoutException.class,
+ InterruptedIOException.class);
+ }
+ return e;
+ }
+
+ private long skipLinesUsingBuffer(FSDataInputStream input, long skipLines)
throws IOException {
+ input.seek(0);
byte[] buffer = new byte[BUFFER_SIZE];
- long matched = 0;
+ int matched = 0;
long lines = 0;
long pos = 0;
int n;
- while ((n = is.read(buffer)) != -1) {
+ while ((n = input.read(buffer)) != -1) {
for (int i = 0; i < n; i++) {
pos++;
- if (buffer[i] == delimiterBytes[(int) matched]) {
+ if (buffer[i] == delimiterBytes[matched]) {
matched++;
if (matched == delimiterBytes.length) {
matched = 0;
@@ -108,53 +202,59 @@ public abstract class AccordingToSplitSizeSplitStrategy
implements FileSplitStra
}
}
} else {
- matched = 0;
+ matched = buffer[i] == delimiterBytes[0] ? 1 : 0;
}
}
}
-
return pos;
}
- private long findNextDelimiterWithBuffer(String filePath, long startPos) {
- try (InputStream is = getInputStream(filePath)) {
- long skipped = skipManually(is, startPos);
- if (skipped < startPos) {
- return startPos;
- }
- byte[] buffer = new byte[BUFFER_SIZE];
- long matched = 0;
- long pos = startPos;
- int n;
- while ((n = is.read(buffer)) != -1) {
- for (int i = 0; i < n; i++) {
- pos++;
- if (buffer[i] == delimiterBytes[(int) matched]) {
- matched++;
- if (matched == delimiterBytes.length) {
- return pos;
+ private long findNextDelimiterWithSeek(FSDataInputStream input, long
startPos, long fileSize)
+ throws IOException {
+ long scanStart = Math.max(0, startPos - (delimiterBytes.length - 1));
+ input.seek(scanStart);
+ byte[] buffer = new byte[BUFFER_SIZE];
+ int matched = 0;
+ long pos = scanStart;
+ int n;
+ while ((n = input.read(buffer)) != -1) {
+ for (int i = 0; i < n; i++) {
+ pos++;
+ if (buffer[i] == delimiterBytes[matched]) {
+ matched++;
+ if (matched == delimiterBytes.length) {
+ long endPos = pos;
+ if (endPos >= startPos) {
+ return endPos;
}
- } else {
matched = 0;
}
+ } else {
+ matched = buffer[i] == delimiterBytes[0] ? 1 : 0;
}
}
- return pos;
-
- } catch (IOException e) {
- throw new
SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_READ_FAILED, e);
}
+ return Math.min(fileSize, pos);
}
- private long skipManually(InputStream is, long bytesToSkip) throws
IOException {
- byte[] buffer = new byte[BUFFER_SIZE];
- long total = 0;
- while (total < bytesToSkip) {
- long toRead = Math.min(buffer.length, bytesToSkip - total);
- int n = is.read(buffer, 0, (int) toRead);
- if (n == -1) break;
- total += n;
- }
- return total;
+ @Override
+ public void close() throws IOException {
+ hadoopFileSystemProxy.close();
+ }
+
+ private static String normalizePath(String filePath) {
+ if (filePath == null) {
+ return null;
+ }
+ if (filePath.contains("://")) {
+ return filePath;
+ }
+ if (filePath.length() >= 3
+ && Character.isLetter(filePath.charAt(0))
+ && filePath.charAt(1) == ':'
+ && (filePath.charAt(2) == '\\' || filePath.charAt(2) == '/')) {
+ return Paths.get(filePath).toUri().toString();
+ }
+ return filePath;
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
index fea28898b0..15053b162d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.api.source.SourceSplit;
import lombok.Getter;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import java.util.Objects;
public class FileSourceSplit implements SourceSplit {
@@ -48,6 +50,15 @@ public class FileSourceSplit implements SourceSplit {
this.length = length;
}
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ in.defaultReadObject();
+ // Compatibility: old checkpoints (before file-split fields)
deserialize with
+ // start=0/length=0.
+ if (start == 0L && length == 0L) {
+ length = -1L;
+ }
+ }
+
@Override
public String splitId() {
// In order to be compatible with the split before the upgrade, when
tableId is null,
@@ -55,6 +66,9 @@ public class FileSourceSplit implements SourceSplit {
if (tableId == null) {
return filePath;
}
+ if (start == 0L && length < 0L) {
+ return tableId + "_" + filePath;
+ }
return tableId + "_" + filePath + "_" + start;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSplitStrategyFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactory.java
similarity index 79%
rename from
seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSplitStrategyFactory.java
rename to
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactory.java
index d6b632e164..a801927e9d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileSplitStrategyFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactory.java
@@ -14,22 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.local.source.split;
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
-import
org.apache.seatunnel.connectors.seatunnel.file.source.split.ParquetFileSplitStrategy;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+import java.util.Objects;
import static
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions.DEFAULT_ROW_DELIMITER;
-public class LocalFileSplitStrategyFactory {
+public class FileSplitStrategyFactory {
- public static FileSplitStrategy initFileSplitStrategy(ReadonlyConfig
readonlyConfig) {
+ public static FileSplitStrategy initFileSplitStrategy(
+ ReadonlyConfig readonlyConfig, HadoopConf hadoopConf) {
if (!readonlyConfig.get(FileBaseSourceOptions.ENABLE_FILE_SPLIT)) {
return new DefaultFileSplitStrategy();
}
@@ -41,9 +42,13 @@ public class LocalFileSplitStrategyFactory {
!= ArchiveCompressFormat.NONE) {
return new DefaultFileSplitStrategy();
}
+
+ Objects.requireNonNull(
+ hadoopConf, "hadoopConf must not be null when file split is
enabled");
+
long fileSplitSize =
readonlyConfig.get(FileBaseSourceOptions.FILE_SPLIT_SIZE);
if (FileFormat.PARQUET ==
readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE)) {
- return new ParquetFileSplitStrategy(fileSplitSize);
+ return new ParquetFileSplitStrategy(fileSplitSize, hadoopConf);
}
String rowDelimiter =
!readonlyConfig.getOptional(FileBaseSourceOptions.ROW_DELIMITER).isPresent()
@@ -54,7 +59,7 @@ public class LocalFileSplitStrategyFactory {
? 1L
:
readonlyConfig.get(FileBaseSourceOptions.SKIP_HEADER_ROW_NUMBER);
String encodingName =
readonlyConfig.get(FileBaseSourceOptions.ENCODING);
- return new LocalFileAccordingToSplitSizeSplitStrategy(
- rowDelimiter, skipHeaderRowNumber, encodingName,
fileSplitSize);
+ return new AccordingToSplitSizeSplitStrategy(
+ hadoopConf, rowDelimiter, skipHeaderRowNumber, encodingName,
fileSplitSize);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
index 31b46140d5..b0c5dd2ad9 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
@@ -26,9 +26,11 @@ import org.apache.commons.collections4.CollectionUtils;
import lombok.extern.slf4j.Slf4j;
+import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -41,6 +43,8 @@ import java.util.stream.Collectors;
public class MultipleTableFileSourceSplitEnumerator
implements SourceSplitEnumerator<FileSourceSplit, FileSourceState> {
+ private static final int LOG_SPLIT_ID_LIMIT = 50;
+
private final Context<FileSourceSplit> context;
private final Set<FileSourceSplit> allSplit;
private final Set<FileSourceSplit> assignedSplit;
@@ -78,15 +82,42 @@ public class MultipleTableFileSourceSplitEnumerator
this.assignedSplit.addAll(fileSourceState.getAssignedSplit());
}
+ public MultipleTableFileSourceSplitEnumerator(
+ Context<FileSourceSplit> context,
+ BaseMultipleTableFileSourceConfig multipleTableFileSourceConfig,
+ FileSplitStrategy fileSplitStrategy,
+ FileSourceState fileSourceState) {
+ this(context, multipleTableFileSourceConfig, fileSplitStrategy);
+ this.assignedSplit.addAll(fileSourceState.getAssignedSplit());
+ }
+
@Override
public void open() {
+ boolean hasMultiSplits = false;
+ Map<String, Integer> splitCountByTable = new HashMap<>();
for (Map.Entry<String, List<String>> filePathEntry :
filePathMap.entrySet()) {
String tableId = filePathEntry.getKey();
List<String> filePaths = filePathEntry.getValue();
for (String filePath : filePaths) {
- allSplit.addAll(fileSplitStrategy.split(tableId, filePath));
+ List<FileSourceSplit> splits =
fileSplitStrategy.split(tableId, filePath);
+ splitCountByTable.merge(tableId, splits.size(), Integer::sum);
+ allSplit.addAll(splits);
+ if (splits.size() > 1) {
+ hasMultiSplits = true;
+ log.info(
+ "Split file [{}] for table [{}] into {} splits",
+ filePath,
+ tableId,
+ splits.size());
+ }
}
}
+ if (hasMultiSplits) {
+ log.info(
+ "Split enumeration finished, total splits: {}, splits by
table: {}",
+ allSplit.size(),
+ splitCountByTable);
+ }
}
@Override
@@ -146,13 +177,27 @@ public class MultipleTableFileSourceSplitEnumerator
log.info(
"SubTask {} is assigned to [{}], size {}",
taskId,
- currentTaskSplits.stream()
- .map(FileSourceSplit::splitId)
- .collect(Collectors.joining(",")),
+ summarizeSplitIds(currentTaskSplits),
currentTaskSplits.size());
context.signalNoMoreSplits(taskId);
}
+ private static String summarizeSplitIds(List<FileSourceSplit> splits) {
+ if (splits.isEmpty()) {
+ return "";
+ }
+ if (splits.size() <= LOG_SPLIT_ID_LIMIT) {
+ return
splits.stream().map(FileSourceSplit::splitId).collect(Collectors.joining(","));
+ }
+ return splits.stream()
+ .limit(LOG_SPLIT_ID_LIMIT)
+ .map(FileSourceSplit::splitId)
+ .collect(Collectors.joining(","))
+ + ",...("
+ + (splits.size() - LOG_SPLIT_ID_LIMIT)
+ + " more)";
+ }
+
private static int getSplitOwner(int assignCount, int numReaders) {
return assignCount % numReaders;
}
@@ -169,6 +214,19 @@ public class MultipleTableFileSourceSplitEnumerator
@Override
public void close() throws IOException {
- // do nothing
+ if (fileSplitStrategy instanceof Closeable) {
+ ((Closeable) fileSplitStrategy).close();
+ return;
+ }
+ if (fileSplitStrategy instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) fileSplitStrategy).close();
+ } catch (Exception e) {
+ if (e instanceof IOException) {
+ throw (IOException) e;
+ }
+ throw new IOException(e);
+ }
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSplitStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSplitStrategy.java
new file mode 100644
index 0000000000..ce76e3c9b8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSplitStrategy.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class MultipleTableFileSplitStrategy implements FileSplitStrategy,
Closeable {
+
+ private final Map<String, FileSplitStrategy> delegateStrategies;
+ private final FileSplitStrategy fallbackStrategy;
+
+ public MultipleTableFileSplitStrategy(Map<String, FileSplitStrategy>
delegateStrategies) {
+ this.delegateStrategies = Objects.requireNonNull(delegateStrategies,
"delegateStrategies");
+ this.fallbackStrategy = new DefaultFileSplitStrategy();
+ }
+
+ @Override
+ public java.util.List<FileSourceSplit> split(String tableId, String
filePath) {
+ FileSplitStrategy delegate = delegateStrategies.get(tableId);
+ if (delegate == null) {
+ return fallbackStrategy.split(tableId, filePath);
+ }
+ return delegate.split(tableId, filePath);
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException exception = null;
+ Set<FileSplitStrategy> uniqueStrategies = new
HashSet<>(delegateStrategies.values());
+ for (FileSplitStrategy strategy : uniqueStrategies) {
+ try {
+ if (strategy instanceof Closeable) {
+ ((Closeable) strategy).close();
+ continue;
+ }
+ if (strategy instanceof AutoCloseable) {
+ ((AutoCloseable) strategy).close();
+ }
+ } catch (Exception e) {
+ IOException current =
+ e instanceof IOException ? (IOException) e : new
IOException(e);
+ if (exception == null) {
+ exception = current;
+ } else {
+ exception.addSuppressed(current);
+ }
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
index 48b7adc102..93a12d3213 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
@@ -18,7 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.split;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -26,6 +28,7 @@ import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.util.HadoopInputFile;
+import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -46,9 +49,10 @@ import java.util.List;
* <p>This design enables efficient parallel reading of Parquet files while
preserving Parquet
* format semantics and avoiding invalid byte-level splits.
*/
-public class ParquetFileSplitStrategy implements FileSplitStrategy {
+public class ParquetFileSplitStrategy implements FileSplitStrategy, Closeable {
private final long splitSizeBytes;
+ private final HadoopFileSystemProxy hadoopFileSystemProxy;
public ParquetFileSplitStrategy(long splitSizeBytes) {
if (splitSizeBytes <= 0) {
@@ -57,6 +61,17 @@ public class ParquetFileSplitStrategy implements
FileSplitStrategy {
"SplitSizeBytes must be greater than 0");
}
this.splitSizeBytes = splitSizeBytes;
+ this.hadoopFileSystemProxy = null;
+ }
+
+ public ParquetFileSplitStrategy(long splitSizeBytes, HadoopConf
hadoopConf) {
+ if (splitSizeBytes <= 0) {
+ throw new SeaTunnelRuntimeException(
+ FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
+ "SplitSizeBytes must be greater than 0");
+ }
+ this.splitSizeBytes = splitSizeBytes;
+ this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
}
@Override
@@ -78,41 +93,74 @@ public class ParquetFileSplitStrategy implements
FileSplitStrategy {
return splits;
}
long currentStart = 0;
- long currentLength = 0;
+ long currentEnd = 0;
boolean hasOpenSplit = false;
for (BlockMetaData block : rowGroups) {
long rgStart = block.getStartingPos();
long rgSize = block.getCompressedSize();
+ long rgEnd = rgStart + rgSize;
// start a new split
if (!hasOpenSplit) {
currentStart = rgStart;
- currentLength = rgSize;
+ currentEnd = rgEnd;
hasOpenSplit = true;
continue;
}
// exceeds threshold, close current split
- if (currentLength + rgSize > splitSizeBytes) {
- splits.add(new FileSourceSplit(tableId, filePath,
currentStart, currentLength));
+ if (rgEnd - currentStart > splitSizeBytes) {
+ splits.add(
+ new FileSourceSplit(
+ tableId, filePath, currentStart, currentEnd -
currentStart));
// start next split
currentStart = rgStart;
- currentLength = rgSize;
+ currentEnd = rgEnd;
} else {
- currentLength += rgSize;
+ currentEnd = rgEnd;
}
}
// last split
- if (hasOpenSplit && currentLength > 0) {
- splits.add(new FileSourceSplit(tableId, filePath, currentStart,
currentLength));
+ if (hasOpenSplit && currentEnd > currentStart) {
+ splits.add(
+ new FileSourceSplit(
+ tableId, filePath, currentStart, currentEnd -
currentStart));
}
return splits;
}
private List<BlockMetaData> readRowGroups(String filePath) throws
IOException {
Path path = new Path(filePath);
- Configuration conf = new Configuration();
- try (ParquetFileReader reader =
- ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) {
- return reader.getFooter().getBlocks();
+ if (hadoopFileSystemProxy == null) {
+ Configuration conf = new Configuration();
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(path,
conf))) {
+ return reader.getFooter().getBlocks();
+ }
+ }
+ try {
+ return hadoopFileSystemProxy.doWithHadoopAuth(
+ (configuration, userGroupInformation) -> {
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(
+ HadoopInputFile.fromPath(path,
configuration))) {
+ return reader.getFooter().getBlocks();
+ }
+ });
+ } catch (Exception e) {
+ if (e instanceof IOException) {
+ throw (IOException) e;
+ }
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ }
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (hadoopFileSystemProxy == null) {
+ return;
}
+ hadoopFileSystemProxy.close();
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
index 0b4e941364..c330d8ba36 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategyTest.java
@@ -36,6 +36,7 @@ import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -45,8 +46,11 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
@@ -57,6 +61,37 @@ import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME
public class AbstractReadStrategyTest {
+ @Test
+ void testSafeSliceUsesSeekForSeekableStream() throws Exception {
+ byte[] data = "0123456789".getBytes(StandardCharsets.UTF_8);
+ TrackingSeekableInputStream in = new TrackingSeekableInputStream(data);
+
+ try (InputStream sliced = AbstractReadStrategy.safeSlice(in, 5, 3)) {
+ byte[] buffer = new byte[10];
+ int n = sliced.read(buffer);
+ Assertions.assertEquals(3, n);
+ Assertions.assertEquals("567", new String(buffer, 0, n,
StandardCharsets.UTF_8));
+ Assertions.assertTrue(in.seekCalled);
+ }
+ }
+
+ @Test
+ void testSafeSliceReadsToEndWhenLengthIsNegative() throws Exception {
+ byte[] data = "0123456789".getBytes(StandardCharsets.UTF_8);
+ TrackingSeekableInputStream in = new TrackingSeekableInputStream(data);
+
+ try (InputStream sliced = AbstractReadStrategy.safeSlice(in, 5, -1)) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] buffer = new byte[4];
+ int n;
+ while ((n = sliced.read(buffer)) != -1) {
+ out.write(buffer, 0, n);
+ }
+ Assertions.assertEquals("56789", new String(out.toByteArray(),
StandardCharsets.UTF_8));
+ Assertions.assertTrue(in.seekCalled);
+ }
+ }
+
@DisabledOnOs(OS.WINDOWS)
@Test
public void testReadDirectorySkipHiddenDirectories() throws Exception {
@@ -146,6 +181,52 @@ public class AbstractReadStrategyTest {
}
}
+ private static class TrackingSeekableInputStream extends InputStream
implements Seekable {
+ private final byte[] data;
+ private int pos;
+ private boolean seekCalled;
+
+ private TrackingSeekableInputStream(byte[] data) {
+ this.data = data;
+ this.pos = 0;
+ }
+
+ @Override
+ public int read() {
+ if (pos >= data.length) {
+ return -1;
+ }
+ return data[pos++] & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ if (pos >= data.length) {
+ return -1;
+ }
+ int toRead = Math.min(len, data.length - pos);
+ System.arraycopy(data, pos, b, off, toRead);
+ pos += toRead;
+ return toRead;
+ }
+
+ @Override
+ public void seek(long newPos) {
+ this.seekCalled = true;
+ this.pos = (int) newPos;
+ }
+
+ @Override
+ public long getPos() {
+ return pos;
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) {
+ return false;
+ }
+ }
+
@Test
void testBothStartAndEndWithinRange() throws Exception {
try (CsvReadStrategy strategy = new CsvReadStrategy()) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitCompatibilityTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitCompatibilityTest.java
new file mode 100644
index 0000000000..9bbca0c1b9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitCompatibilityTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.tools.JavaCompiler;
+import javax.tools.ToolProvider;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class FileSourceSplitCompatibilityTest {
+
+ private static final String LEGACY_SPLIT_CLASS_NAME =
+
"org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit";
+
+ @TempDir private Path tempDir;
+
+ @Test
+ void testDeserializeLegacyTwoArgSplitDefaultsToWholeFile() throws
Exception {
+ byte[] legacyBytes = serializeLegacySplit(tempDir, "t",
"file:///tmp/test.txt");
+ FileSourceSplit split = deserialize(legacyBytes);
+
+ Assertions.assertEquals("t", split.getTableId());
+ Assertions.assertEquals("file:///tmp/test.txt", split.getFilePath());
+ Assertions.assertEquals(0L, split.getStart());
+ Assertions.assertEquals(-1L, split.getLength());
+ Assertions.assertEquals("t_file:///tmp/test.txt", split.splitId());
+ }
+
+ @Test
+ void testDeserializeLegacySingleArgSplitDefaultsToWholeFile() throws
Exception {
+ byte[] legacyBytes = serializeLegacySplit(tempDir,
"file:///tmp/test.txt");
+ FileSourceSplit split = deserialize(legacyBytes);
+
+ Assertions.assertNull(split.getTableId());
+ Assertions.assertEquals("file:///tmp/test.txt", split.getFilePath());
+ Assertions.assertEquals(0L, split.getStart());
+ Assertions.assertEquals(-1L, split.getLength());
+ Assertions.assertEquals("file:///tmp/test.txt", split.splitId());
+ }
+
+ private static FileSourceSplit deserialize(byte[] bytes) throws Exception {
+ try (ObjectInputStream in = new ObjectInputStream(new
ByteArrayInputStream(bytes))) {
+ Object obj = in.readObject();
+ Assertions.assertTrue(obj instanceof FileSourceSplit);
+ return (FileSourceSplit) obj;
+ }
+ }
+
+ private static byte[] serializeLegacySplit(Path tempDir, String tableId,
String filePath)
+ throws Exception {
+ Class<?> legacyClass = compileAndLoadLegacyClass(tempDir);
+ Constructor<?> ctor = legacyClass.getConstructor(String.class,
String.class);
+ Object legacySplit = ctor.newInstance(tableId, filePath);
+ return serialize(legacySplit);
+ }
+
+ private static byte[] serializeLegacySplit(Path tempDir, String splitId)
throws Exception {
+ Class<?> legacyClass = compileAndLoadLegacyClass(tempDir);
+ Constructor<?> ctor = legacyClass.getConstructor(String.class);
+ Object legacySplit = ctor.newInstance(splitId);
+ return serialize(legacySplit);
+ }
+
+ private static byte[] serialize(Object legacySplit) throws Exception {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(out)) {
+ oos.writeObject(legacySplit);
+ }
+ return out.toByteArray();
+ }
+
+ private static Class<?> compileAndLoadLegacyClass(Path tempDir) throws
Exception {
+ JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+ Assumptions.assumeTrue(
+ compiler != null, "JDK compiler is required for legacy
compatibility test");
+
+ Path sourceRoot = tempDir.resolve("legacy-src");
+ Path outputRoot = tempDir.resolve("legacy-out");
+ Path sourceFile =
+ sourceRoot.resolve(
+
"org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplit.java");
+ Files.createDirectories(sourceFile.getParent());
+ Files.createDirectories(outputRoot);
+
+ Files.write(sourceFile,
legacySourceCode().getBytes(StandardCharsets.UTF_8));
+
+ String classpath = System.getProperty("java.class.path");
+ int result =
+ compiler.run(
+ null,
+ null,
+ null,
+ "-classpath",
+ classpath,
+ "-d",
+ outputRoot.toString(),
+ sourceFile.toString());
+ Assertions.assertEquals(0, result, "Failed to compile legacy
FileSourceSplit");
+
+ URL[] urls = new URL[] {outputRoot.toUri().toURL()};
+ try (ChildFirstClassLoader loader =
+ new ChildFirstClassLoader(
+ urls,
FileSourceSplitCompatibilityTest.class.getClassLoader())) {
+ return Class.forName(LEGACY_SPLIT_CLASS_NAME, true, loader);
+ }
+ }
+
+ private static String legacySourceCode() {
+ return "package
org.apache.seatunnel.connectors.seatunnel.file.source.split;\n"
+ + "\n"
+ + "import org.apache.seatunnel.api.source.SourceSplit;\n"
+ + "\n"
+ + "import java.util.Objects;\n"
+ + "\n"
+ + "public class FileSourceSplit implements SourceSplit {\n"
+ + " private static final long serialVersionUID = 1L;\n"
+ + "\n"
+ + " private final String tableId;\n"
+ + " private final String filePath;\n"
+ + "\n"
+ + " public FileSourceSplit(String splitId) {\n"
+ + " this.filePath = splitId;\n"
+ + " this.tableId = null;\n"
+ + " }\n"
+ + "\n"
+ + " public FileSourceSplit(String tableId, String filePath)
{\n"
+ + " this.tableId = tableId;\n"
+ + " this.filePath = filePath;\n"
+ + " }\n"
+ + "\n"
+ + " @Override\n"
+ + " public String splitId() {\n"
+ + " if (tableId == null) {\n"
+ + " return filePath;\n"
+ + " }\n"
+ + " return tableId + \"_\" + filePath;\n"
+ + " }\n"
+ + "\n"
+ + " @Override\n"
+ + " public boolean equals(Object o) {\n"
+ + " if (this == o) {\n"
+ + " return true;\n"
+ + " }\n"
+ + " if (o == null || getClass() != o.getClass()) {\n"
+ + " return false;\n"
+ + " }\n"
+ + " FileSourceSplit that = (FileSourceSplit) o;\n"
+ + " return Objects.equals(tableId, that.tableId)\n"
+ + " && Objects.equals(filePath,
that.filePath);\n"
+ + " }\n"
+ + "\n"
+ + " @Override\n"
+ + " public int hashCode() {\n"
+ + " return Objects.hash(tableId, filePath);\n"
+ + " }\n"
+ + "}\n";
+ }
+
+ private static final class ChildFirstClassLoader extends URLClassLoader {
+ private ChildFirstClassLoader(URL[] urls, ClassLoader parent) {
+ super(urls, parent);
+ }
+
+ @Override
+ protected Class<?> loadClass(String name, boolean resolve) throws
ClassNotFoundException {
+ synchronized (getClassLoadingLock(name)) {
+ if (LEGACY_SPLIT_CLASS_NAME.equals(name)) {
+ Class<?> loaded = findLoadedClass(name);
+ if (loaded == null) {
+ loaded = findClass(name);
+ }
+ if (resolve) {
+ resolveClass(loaded);
+ }
+ return loaded;
+ }
+ return super.loadClass(name, resolve);
+ }
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
index 1d9c01e62f..413c1ec3ef 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
@@ -25,7 +25,11 @@ import
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFi
public class HdfsFileSource extends BaseMultipleTableFileSource {
public HdfsFileSource(ReadonlyConfig readonlyConfig) {
- super(new MultipleTableHdfsFileSourceConfig(readonlyConfig));
+ this(new MultipleTableHdfsFileSourceConfig(readonlyConfig));
+ }
+
+ private HdfsFileSource(MultipleTableHdfsFileSourceConfig sourceConfig) {
+ super(sourceConfig, initFileSplitStrategy(sourceConfig));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index 6f8211873a..e2cbd802de 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -85,6 +85,18 @@ public class HdfsFileSourceFactory implements
TableSourceFactory {
Arrays.asList(
FileFormat.TEXT, FileFormat.JSON,
FileFormat.CSV, FileFormat.XML),
FileBaseSourceOptions.ENCODING)
+ .conditional(
+ FileBaseSourceOptions.FILE_FORMAT_TYPE,
+ Arrays.asList(
+ FileFormat.TEXT,
+ FileFormat.JSON,
+ FileFormat.CSV,
+ FileFormat.PARQUET),
+ FileBaseSourceOptions.ENABLE_FILE_SPLIT)
+ .conditional(
+ FileBaseSourceOptions.ENABLE_FILE_SPLIT,
+ Boolean.TRUE,
+ FileBaseSourceOptions.FILE_SPLIT_SIZE)
.optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
.optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
.optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/split/HdfsFileAccordingToSplitSizeSplitStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/split/HdfsFileAccordingToSplitSizeSplitStrategyTest.java
new file mode 100644
index 0000000000..30abb54666
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/split/HdfsFileAccordingToSplitSizeSplitStrategyTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.split;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.config.HdfsFileHadoopConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.AccordingToSplitSizeSplitStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class HdfsFileAccordingToSplitSizeSplitStrategyTest {
+
+ @TempDir private Path tempDir;
+
+ @Test
+ void testSplitNonExistingFileShouldThrowFileNotFound() throws Exception {
+ String fileUri = tempDir.resolve("not_exist.txt").toUri().toString();
+ try (AccordingToSplitSizeSplitStrategy strategy =
+ new AccordingToSplitSizeSplitStrategy(
+ new HdfsFileHadoopConfig("file:///"), "\n", 0,
"UTF-8", 6)) {
+ SeaTunnelRuntimeException ex =
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class, () ->
strategy.split("t", fileUri));
+ Assertions.assertEquals(
+ FileConnectorErrorCode.FILE_NOT_FOUND,
ex.getSeaTunnelErrorCode());
+ }
+ }
+
+ @Test
+ void testSplitByDelimiterSeek() throws IOException {
+ Path filePath = tempDir.resolve("test.txt");
+ Files.write(filePath,
"abc\nabc\nabc\nabc\nabc\n".getBytes(StandardCharsets.UTF_8));
+
+ String fileUri = filePath.toUri().toString();
+ try (AccordingToSplitSizeSplitStrategy strategy =
+ new AccordingToSplitSizeSplitStrategy(
+ new HdfsFileHadoopConfig("file:///"), "\n", 0,
"UTF-8", 6)) {
+ List<FileSourceSplit> splits = strategy.split("t", fileUri);
+ Assertions.assertEquals(3, splits.size());
+
+ Assertions.assertEquals(0, splits.get(0).getStart());
+ Assertions.assertEquals(8, splits.get(0).getLength());
+
+ Assertions.assertEquals(8, splits.get(1).getStart());
+ Assertions.assertEquals(8, splits.get(1).getLength());
+
+ Assertions.assertEquals(16, splits.get(2).getStart());
+ Assertions.assertEquals(4, splits.get(2).getLength());
+ }
+ }
+
+ @Test
+ void testSplitWithSkipHeaderLine() throws IOException {
+ Path filePath = tempDir.resolve("with_header.txt");
+ Files.write(filePath,
"header\nabc\nabc\nabc\nabc\n".getBytes(StandardCharsets.UTF_8));
+
+ String fileUri = filePath.toUri().toString();
+ try (AccordingToSplitSizeSplitStrategy strategy =
+ new AccordingToSplitSizeSplitStrategy(
+ new HdfsFileHadoopConfig("file:///"), "\n", 1,
"UTF-8", 6)) {
+ List<FileSourceSplit> splits = strategy.split("t", fileUri);
+ Assertions.assertEquals(2, splits.size());
+
+ Assertions.assertEquals(7, splits.get(0).getStart());
+ Assertions.assertEquals(8, splits.get(0).getLength());
+
+ Assertions.assertEquals(15, splits.get(1).getStart());
+ Assertions.assertEquals(8, splits.get(1).getLength());
+ }
+ }
+
+ @Test
+ void testSplitWithCrLfDelimiter() throws IOException {
+ Path filePath = tempDir.resolve("crlf.txt");
+ Files.write(filePath,
"a\r\nb\r\nc\r\n".getBytes(StandardCharsets.UTF_8));
+
+ String fileUri = filePath.toUri().toString();
+ try (AccordingToSplitSizeSplitStrategy strategy =
+ new AccordingToSplitSizeSplitStrategy(
+ new HdfsFileHadoopConfig("file:///"), "\r\n", 0,
"UTF-8", 2)) {
+ List<FileSourceSplit> splits = strategy.split("t", fileUri);
+ Assertions.assertEquals(3, splits.size());
+
+ Assertions.assertEquals(0, splits.get(0).getStart());
+ Assertions.assertEquals(3, splits.get(0).getLength());
+
+ Assertions.assertEquals(3, splits.get(1).getStart());
+ Assertions.assertEquals(3, splits.get(1).getLength());
+
+ Assertions.assertEquals(6, splits.get(2).getStart());
+ Assertions.assertEquals(3, splits.get(2).getLength());
+ }
+ }
+
+ @Test
+ void testReadBySplitsShouldMatchFullRead() throws Exception {
+ Path filePath = tempDir.resolve("read_compare.txt");
+ List<String> lines = new ArrayList<>();
+ lines.add("header");
+ for (int i = 1; i <= 200; i++) {
+ lines.add("value-" + i);
+ }
+ Files.write(filePath, (String.join("\n", lines) +
"\n").getBytes(StandardCharsets.UTF_8));
+
+ String fileUri = filePath.toUri().toString();
+ HdfsFileHadoopConfig hadoopConf = new HdfsFileHadoopConfig("file:///");
+ String tableId = "t";
+
+ List<String> fullReadResult =
+ readByTextStrategy(
+ hadoopConf,
+ fileUri,
+ tableId,
+ Collections.singletonList(new FileSourceSplit(tableId,
fileUri)),
+ false,
+ "\n",
+ 1);
+ Assertions.assertEquals(200, fullReadResult.size());
+ Assertions.assertEquals("value-1", fullReadResult.get(0));
+
+ List<FileSourceSplit> splits;
+ try (AccordingToSplitSizeSplitStrategy splitStrategy =
+ new AccordingToSplitSizeSplitStrategy(hadoopConf, "\n", 1,
"UTF-8", 64)) {
+ splits = splitStrategy.split(tableId, fileUri);
+ }
+ Assertions.assertTrue(splits.size() > 1);
+
+ List<String> splitReadResult =
+ readByTextStrategy(hadoopConf, fileUri, tableId, splits, true,
"\n", 1);
+ Assertions.assertEquals(fullReadResult, splitReadResult);
+ }
+
+ private static List<String> readByTextStrategy(
+ HdfsFileHadoopConfig hadoopConf,
+ String fileUri,
+ String tableId,
+ List<FileSourceSplit> splits,
+ boolean enableFileSplit,
+ String rowDelimiter,
+ long skipHeaderRows)
+ throws Exception {
+ Config pluginConfig =
+ ConfigFactory.empty()
+ .withValue(
+ FileBaseSourceOptions.FILE_PATH.key(),
+ ConfigValueFactory.fromAnyRef(fileUri))
+ .withValue(
+ FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(),
+ ConfigValueFactory.fromAnyRef(enableFileSplit))
+ .withValue(
+ FileBaseSourceOptions.ROW_DELIMITER.key(),
+ ConfigValueFactory.fromAnyRef(rowDelimiter))
+ .withValue(
+
FileBaseSourceOptions.SKIP_HEADER_ROW_NUMBER.key(),
+ ConfigValueFactory.fromAnyRef(skipHeaderRows));
+
+ List<String> results = new ArrayList<>();
+ try (TextReadStrategy readStrategy = new TextReadStrategy()) {
+ readStrategy.setPluginConfig(pluginConfig);
+ readStrategy.init(hadoopConf);
+ readStrategy.getFileNamesByPath(fileUri);
+
readStrategy.setCatalogTable(CatalogTableUtil.buildSimpleTextTable());
+
+ FirstFieldCollector collector = new FirstFieldCollector(tableId,
results);
+ for (FileSourceSplit split : splits) {
+ readStrategy.read(split, collector);
+ }
+ }
+ return results;
+ }
+
+ private static class FirstFieldCollector implements
Collector<SeaTunnelRow> {
+ private final Object lock = new Object();
+ private final String tableId;
+ private final List<String> rows;
+
+ private FirstFieldCollector(String tableId, List<String> rows) {
+ this.tableId = tableId;
+ this.rows = rows;
+ }
+
+ @Override
+ public void collect(SeaTunnelRow record) {
+ Assertions.assertEquals(tableId, record.getTableId());
+ Object field = record.getField(0);
+ rows.add(field == null ? null : String.valueOf(field));
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return lock;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/split/HdfsFileSplitStrategyFactoryTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/split/HdfsFileSplitStrategyFactoryTest.java
new file mode 100644
index 0000000000..c480698220
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/split/HdfsFileSplitStrategyFactoryTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.split;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.config.HdfsFileHadoopConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.AccordingToSplitSizeSplitStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategyFactory;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.ParquetFileSplitStrategy;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HdfsFileSplitStrategyFactoryTest {
+
+ @Test
+ void testInitFileSplitStrategy() {
+ HdfsFileHadoopConfig hadoopConf = new HdfsFileHadoopConfig("file:///");
+
+ Map<String, Object> map = baseConfig(FileFormat.ORC);
+ map.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ FileSplitStrategy fileSplitStrategy =
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map), hadoopConf);
+ Assertions.assertInstanceOf(DefaultFileSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
+
+ Map<String, Object> map1 = baseConfig(FileFormat.TEXT);
+ fileSplitStrategy =
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map1), hadoopConf);
+ Assertions.assertInstanceOf(DefaultFileSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
+
+ Map<String, Object> map2 = baseConfig(FileFormat.TEXT);
+ map2.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ fileSplitStrategy =
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map2), hadoopConf);
+ Assertions.assertInstanceOf(AccordingToSplitSizeSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
+
+ Map<String, Object> map3 = baseConfig(FileFormat.CSV);
+ map3.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ fileSplitStrategy =
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map3), hadoopConf);
+ Assertions.assertInstanceOf(AccordingToSplitSizeSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
+
+ Map<String, Object> map4 = baseConfig(FileFormat.JSON);
+ map4.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ fileSplitStrategy =
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map4), hadoopConf);
+ Assertions.assertInstanceOf(AccordingToSplitSizeSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
+
+ Map<String, Object> map5 = baseConfig(FileFormat.PARQUET);
+ map5.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ fileSplitStrategy =
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map5), hadoopConf);
+ Assertions.assertInstanceOf(ParquetFileSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
+
+ Map<String, Object> map6 = baseConfig(FileFormat.PARQUET);
+ map6.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+ map6.put(FileBaseSourceOptions.COMPRESS_CODEC.key(),
CompressFormat.LZO);
+ map6.put(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC.key(),
ArchiveCompressFormat.NONE);
+ fileSplitStrategy =
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map6), hadoopConf);
+ Assertions.assertInstanceOf(DefaultFileSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
+ }
+
+ private Map<String, Object> baseConfig(FileFormat fileFormat) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), fileFormat);
+ map.put(HdfsSourceConfigOptions.DEFAULT_FS.key(), "file:///");
+ return map;
+ }
+
+ private void closeQuietly(FileSplitStrategy strategy) {
+ try {
+ if (strategy instanceof Closeable) {
+ ((Closeable) strategy).close();
+ return;
+ }
+ if (strategy instanceof AutoCloseable) {
+ ((AutoCloseable) strategy).close();
+ }
+ } catch (Exception ignored) {
+ // ignore
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index bf3311efab..fd5e24d6ff 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -20,15 +20,16 @@ package
org.apache.seatunnel.connectors.seatunnel.file.local.source;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.MultipleTableLocalFileSourceConfig;
-import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSplitStrategyFactory;
import
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
public class LocalFileSource extends BaseMultipleTableFileSource {
public LocalFileSource(ReadonlyConfig readonlyConfig) {
- super(
- new MultipleTableLocalFileSourceConfig(readonlyConfig),
-
LocalFileSplitStrategyFactory.initFileSplitStrategy(readonlyConfig));
+ this(new MultipleTableLocalFileSourceConfig(readonlyConfig));
+ }
+
+ private LocalFileSource(MultipleTableLocalFileSourceConfig sourceConfig) {
+ super(sourceConfig, initFileSplitStrategy(sourceConfig));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileAccordingToSplitSizeSplitStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileAccordingToSplitSizeSplitStrategy.java
index 3a6cb18b13..10faa8a9c6 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileAccordingToSplitSizeSplitStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileAccordingToSplitSizeSplitStrategy.java
@@ -16,44 +16,25 @@
*/
package org.apache.seatunnel.connectors.seatunnel.file.local.source.split;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.AccordingToSplitSizeSplitStrategy;
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
+/**
+ * Compatibility adapter for historical local-file split strategy.
+ *
+ * @deprecated Use {@link AccordingToSplitSizeSplitStrategy} via {@link
+ *
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategyFactory}.
+ */
+@Deprecated
public class LocalFileAccordingToSplitSizeSplitStrategy extends
AccordingToSplitSizeSplitStrategy {
public LocalFileAccordingToSplitSizeSplitStrategy(
String rowDelimiter, long skipHeaderRowNumber, String
encodingName, long splitSize) {
- super(rowDelimiter, skipHeaderRowNumber, encodingName, splitSize);
- }
-
- @Override
- protected InputStream getInputStream(String filePath) throws IOException {
- Path path = toLocalNioPath(filePath);
- return new BufferedInputStream(Files.newInputStream(path));
- }
-
- @Override
- protected long getFileSize(String filePath) throws IOException {
- Path path = toLocalNioPath(filePath);
- return Files.size(path);
- }
-
- private static Path toLocalNioPath(String filePath) {
- try {
- URI uri = URI.create(filePath);
- if ("file".equalsIgnoreCase(uri.getScheme())) {
- return Paths.get(uri);
- }
- } catch (Exception ignored) {
- // ignore malformed URI
- }
- return Paths.get(filePath);
+ super(
+ new LocalFileHadoopConf(),
+ rowDelimiter,
+ skipHeaderRowNumber,
+ encodingName,
+ splitSize);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileSourceTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileSourceTest.java
index 379f2303a5..a64af3b81a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileSourceTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileSourceTest.java
@@ -21,15 +21,17 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressForm
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileAccordingToSplitSizeSplitStrategy;
-import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileSplitStrategyFactory;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.AccordingToSplitSizeSplitStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategyFactory;
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.ParquetFileSplitStrategy;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
@@ -42,45 +44,54 @@ public class LocalFileSourceTest {
map.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), FileFormat.ORC);
map.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
FileSplitStrategy fileSplitStrategy =
-
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map));
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map), new
LocalFileHadoopConf());
Assertions.assertInstanceOf(DefaultFileSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
// test text, no split
Map<String, Object> map1 = new HashMap<>();
map1.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.TEXT);
fileSplitStrategy =
-
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map1));
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map1), new
LocalFileHadoopConf());
Assertions.assertInstanceOf(DefaultFileSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
// test text, split
Map<String, Object> map2 = new HashMap<>();
map2.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.TEXT);
map2.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
fileSplitStrategy =
-
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map2));
- Assertions.assertInstanceOf(
- LocalFileAccordingToSplitSizeSplitStrategy.class,
fileSplitStrategy);
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map2), new
LocalFileHadoopConf());
+ Assertions.assertInstanceOf(AccordingToSplitSizeSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
// test csv, split
Map<String, Object> map3 = new HashMap<>();
map3.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), FileFormat.CSV);
map3.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
fileSplitStrategy =
-
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map3));
- Assertions.assertInstanceOf(
- LocalFileAccordingToSplitSizeSplitStrategy.class,
fileSplitStrategy);
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map3), new
LocalFileHadoopConf());
+ Assertions.assertInstanceOf(AccordingToSplitSizeSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
// test json, split
Map<String, Object> map4 = new HashMap<>();
map4.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.JSON);
map4.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
fileSplitStrategy =
-
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map4));
- Assertions.assertInstanceOf(
- LocalFileAccordingToSplitSizeSplitStrategy.class,
fileSplitStrategy);
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map4), new
LocalFileHadoopConf());
+ Assertions.assertInstanceOf(AccordingToSplitSizeSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
// test parquet, split
Map<String, Object> map5 = new HashMap<>();
map5.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.PARQUET);
map5.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
fileSplitStrategy =
-
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map5));
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map5), new
LocalFileHadoopConf());
Assertions.assertInstanceOf(ParquetFileSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
// test compress 1
Map<String, Object> map6 = new HashMap<>();
map6.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.PARQUET);
@@ -88,8 +99,10 @@ public class LocalFileSourceTest {
map6.put(FileBaseSourceOptions.COMPRESS_CODEC.key(),
CompressFormat.LZO);
map6.put(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC.key(),
ArchiveCompressFormat.NONE);
fileSplitStrategy =
-
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map6));
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map6), new
LocalFileHadoopConf());
Assertions.assertInstanceOf(DefaultFileSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
// test compress 2
Map<String, Object> map7 = new HashMap<>();
map7.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(),
FileFormat.PARQUET);
@@ -97,7 +110,23 @@ public class LocalFileSourceTest {
map7.put(FileBaseSourceOptions.COMPRESS_CODEC.key(),
CompressFormat.NONE);
map7.put(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC.key(),
ArchiveCompressFormat.NONE);
fileSplitStrategy =
-
LocalFileSplitStrategyFactory.initFileSplitStrategy(ReadonlyConfig.fromMap(map7));
+ FileSplitStrategyFactory.initFileSplitStrategy(
+ ReadonlyConfig.fromMap(map7), new
LocalFileHadoopConf());
Assertions.assertInstanceOf(ParquetFileSplitStrategy.class,
fileSplitStrategy);
+ closeQuietly(fileSplitStrategy);
+ }
+
+ private void closeQuietly(FileSplitStrategy strategy) {
+ try {
+ if (strategy instanceof Closeable) {
+ ((Closeable) strategy).close();
+ return;
+ }
+ if (strategy instanceof AutoCloseable) {
+ ((AutoCloseable) strategy).close();
+ }
+ } catch (Exception ignored) {
+ // ignore
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/SplitFileStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/SplitFileStrategyTest.java
index c47aa7214d..e2c1f1a09c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/SplitFileStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/SplitFileStrategyTest.java
@@ -16,7 +16,8 @@
*/
package org.apache.seatunnel.connectors.seatunnel.file.local;
-import
org.apache.seatunnel.connectors.seatunnel.file.local.source.split.LocalFileAccordingToSplitSizeSplitStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalFileHadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.split.AccordingToSplitSizeSplitStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.junit.jupiter.api.Assertions;
@@ -39,18 +40,21 @@ public class SplitFileStrategyTest {
@SneakyThrows
@Test
public void testSplitNoSkipHeader() {
- final LocalFileAccordingToSplitSizeSplitStrategy
localFileSplitStrategy =
- new LocalFileAccordingToSplitSizeSplitStrategy("\n", 0L,
"utf-8", 100L);
URL url =
getClass().getClassLoader().getResource("test_split_csv_data.csv");
String realPath = Paths.get(url.toURI()).toString();
- final List<FileSourceSplit> splits =
localFileSplitStrategy.split("test.table", realPath);
- Assertions.assertEquals(2, splits.size());
- // check split-1
- Assertions.assertEquals(0, splits.get(0).getStart());
- Assertions.assertEquals(105, splits.get(0).getLength());
- // check split-2
- Assertions.assertEquals(105, splits.get(1).getStart());
- Assertions.assertEquals(85, splits.get(1).getLength());
+ try (AccordingToSplitSizeSplitStrategy localFileSplitStrategy =
+ new AccordingToSplitSizeSplitStrategy(
+ new LocalFileHadoopConf(), "\n", 0L, "utf-8", 100L)) {
+ final List<FileSourceSplit> splits =
+ localFileSplitStrategy.split("test.table", realPath);
+ Assertions.assertEquals(2, splits.size());
+ // check split-1
+ Assertions.assertEquals(0, splits.get(0).getStart());
+ Assertions.assertEquals(105, splits.get(0).getLength());
+ // check split-2
+ Assertions.assertEquals(105, splits.get(1).getStart());
+ Assertions.assertEquals(85, splits.get(1).getLength());
+ }
}
@DisabledOnOs(
@@ -60,24 +64,27 @@ public class SplitFileStrategyTest {
@SneakyThrows
@Test
public void testSplitSkipHeader() {
- final LocalFileAccordingToSplitSizeSplitStrategy
localFileSplitStrategy =
- new LocalFileAccordingToSplitSizeSplitStrategy("\n", 1L,
"utf-8", 30L);
URL url =
getClass().getClassLoader().getResource("test_split_csv_data.csv");
String realPath = Paths.get(url.toURI()).toString();
- final List<FileSourceSplit> splits =
localFileSplitStrategy.split("test.table", realPath);
- Assertions.assertEquals(4, splits.size());
- // check split-1
- Assertions.assertEquals(21, splits.get(0).getStart());
- Assertions.assertEquals(41, splits.get(0).getLength());
- // check split-2
- Assertions.assertEquals(62, splits.get(1).getStart());
- Assertions.assertEquals(43, splits.get(1).getLength());
- // check split-3
- Assertions.assertEquals(105, splits.get(2).getStart());
- Assertions.assertEquals(43, splits.get(2).getLength());
- // check split-4
- Assertions.assertEquals(148, splits.get(3).getStart());
- Assertions.assertEquals(42, splits.get(3).getLength());
+ try (AccordingToSplitSizeSplitStrategy localFileSplitStrategy =
+ new AccordingToSplitSizeSplitStrategy(
+ new LocalFileHadoopConf(), "\n", 1L, "utf-8", 30L)) {
+ final List<FileSourceSplit> splits =
+ localFileSplitStrategy.split("test.table", realPath);
+ Assertions.assertEquals(4, splits.size());
+ // check split-1
+ Assertions.assertEquals(21, splits.get(0).getStart());
+ Assertions.assertEquals(41, splits.get(0).getLength());
+ // check split-2
+ Assertions.assertEquals(62, splits.get(1).getStart());
+ Assertions.assertEquals(43, splits.get(1).getLength());
+ // check split-3
+ Assertions.assertEquals(105, splits.get(2).getStart());
+ Assertions.assertEquals(43, splits.get(2).getLength());
+ // check split-4
+ Assertions.assertEquals(148, splits.get(3).getStart());
+ Assertions.assertEquals(42, splits.get(3).getLength());
+ }
}
@DisabledOnOs(
@@ -87,15 +94,18 @@ public class SplitFileStrategyTest {
@SneakyThrows
@Test
public void testSplitSkipHeaderLargeSize() {
- final LocalFileAccordingToSplitSizeSplitStrategy
localFileSplitStrategy =
- new LocalFileAccordingToSplitSizeSplitStrategy("\n", 1L,
"utf-8", 300L);
URL url =
getClass().getClassLoader().getResource("test_split_csv_data.csv");
String realPath = Paths.get(url.toURI()).toString();
- final List<FileSourceSplit> splits =
localFileSplitStrategy.split("test.table", realPath);
- Assertions.assertEquals(1, splits.size());
- // check split-1
- Assertions.assertEquals(21, splits.get(0).getStart());
- Assertions.assertEquals(169, splits.get(0).getLength());
+ try (AccordingToSplitSizeSplitStrategy localFileSplitStrategy =
+ new AccordingToSplitSizeSplitStrategy(
+ new LocalFileHadoopConf(), "\n", 1L, "utf-8", 300L)) {
+ final List<FileSourceSplit> splits =
+ localFileSplitStrategy.split("test.table", realPath);
+ Assertions.assertEquals(1, splits.size());
+ // check split-1
+ Assertions.assertEquals(21, splits.get(0).getStart());
+ Assertions.assertEquals(169, splits.get(0).getLength());
+ }
}
@DisabledOnOs(
@@ -105,51 +115,60 @@ public class SplitFileStrategyTest {
@SneakyThrows
@Test
public void testSplitSkipHeaderSmallSize() {
- final LocalFileAccordingToSplitSizeSplitStrategy
localFileSplitStrategy =
- new LocalFileAccordingToSplitSizeSplitStrategy("\n", 1L,
"utf-8", 3L);
URL url =
getClass().getClassLoader().getResource("test_split_csv_data.csv");
String realPath = Paths.get(url.toURI()).toString();
- final List<FileSourceSplit> splits =
localFileSplitStrategy.split("test.table", realPath);
- Assertions.assertEquals(8, splits.size());
- // check split
- Assertions.assertEquals(21, splits.get(0).getStart());
- Assertions.assertEquals(42, splits.get(1).getStart());
- Assertions.assertEquals(62, splits.get(2).getStart());
- Assertions.assertEquals(82, splits.get(3).getStart());
- Assertions.assertEquals(105, splits.get(4).getStart());
- Assertions.assertEquals(126, splits.get(5).getStart());
- Assertions.assertEquals(148, splits.get(6).getStart());
- Assertions.assertEquals(169, splits.get(7).getStart());
+ try (AccordingToSplitSizeSplitStrategy localFileSplitStrategy =
+ new AccordingToSplitSizeSplitStrategy(
+ new LocalFileHadoopConf(), "\n", 1L, "utf-8", 3L)) {
+ final List<FileSourceSplit> splits =
+ localFileSplitStrategy.split("test.table", realPath);
+ Assertions.assertEquals(8, splits.size());
+ // check split
+ Assertions.assertEquals(21, splits.get(0).getStart());
+ Assertions.assertEquals(42, splits.get(1).getStart());
+ Assertions.assertEquals(62, splits.get(2).getStart());
+ Assertions.assertEquals(82, splits.get(3).getStart());
+ Assertions.assertEquals(105, splits.get(4).getStart());
+ Assertions.assertEquals(126, splits.get(5).getStart());
+ Assertions.assertEquals(148, splits.get(6).getStart());
+ Assertions.assertEquals(169, splits.get(7).getStart());
+ }
}
@SneakyThrows
@Test
public void testSplitSkipHeaderSpecialRowDelimiter() {
- final LocalFileAccordingToSplitSizeSplitStrategy
localFileSplitStrategy =
- new LocalFileAccordingToSplitSizeSplitStrategy("|^|", 1L,
"utf-8", 80L);
URL url =
getClass()
.getClassLoader()
.getResource("test_split_special_row_delimiter_data.txt");
String realPath = Paths.get(url.toURI()).toString();
- final List<FileSourceSplit> splits =
localFileSplitStrategy.split("test.table", realPath);
- Assertions.assertEquals(2, splits.size());
- // check split-1
- Assertions.assertEquals(23, splits.get(0).getStart());
- Assertions.assertEquals(92, splits.get(0).getLength());
- // check split-2
- Assertions.assertEquals(115, splits.get(1).getStart());
- Assertions.assertEquals(91, splits.get(1).getLength());
+ try (AccordingToSplitSizeSplitStrategy localFileSplitStrategy =
+ new AccordingToSplitSizeSplitStrategy(
+ new LocalFileHadoopConf(), "|^|", 1L, "utf-8", 80L)) {
+ final List<FileSourceSplit> splits =
+ localFileSplitStrategy.split("test.table", realPath);
+ Assertions.assertEquals(2, splits.size());
+ // check split-1
+ Assertions.assertEquals(23, splits.get(0).getStart());
+ Assertions.assertEquals(92, splits.get(0).getLength());
+ // check split-2
+ Assertions.assertEquals(115, splits.get(1).getStart());
+ Assertions.assertEquals(91, splits.get(1).getLength());
+ }
}
@SneakyThrows
@Test
public void testSplitEmpty() {
- final LocalFileAccordingToSplitSizeSplitStrategy
localFileSplitStrategy =
- new LocalFileAccordingToSplitSizeSplitStrategy("\n", 1L,
"utf-8", 300L);
URL url =
getClass().getClassLoader().getResource("test_split_empty_data.csv");
String realPath = Paths.get(url.toURI()).toString();
- final List<FileSourceSplit> splits =
localFileSplitStrategy.split("test.table", realPath);
- Assertions.assertEquals(0, splits.size());
+ try (AccordingToSplitSizeSplitStrategy localFileSplitStrategy =
+ new AccordingToSplitSizeSplitStrategy(
+ new LocalFileHadoopConf(), "\n", 1L, "utf-8", 300L)) {
+ final List<FileSourceSplit> splits =
+ localFileSplitStrategy.split("test.table", realPath);
+ Assertions.assertEquals(0, splits.size());
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
index 015ec79690..40bbdb9d8e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/hdfs/HdfsFileIT.java
@@ -129,6 +129,28 @@ public class HdfsFileIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, readResult.getExitCode());
}
+ @TestTemplate
+ public void testHdfsParquetReadWithFileSplit(TestContainer container)
+ throws IOException, InterruptedException {
+ org.testcontainers.containers.Container.ExecResult writeResult =
+ container.executeJob("/fake_to_hdfs_normal.conf");
+ Assertions.assertEquals(0, writeResult.getExitCode());
+ org.testcontainers.containers.Container.ExecResult readResult =
+ container.executeJob("/hdfs_parquet_split_to_assert.conf");
+ Assertions.assertEquals(0, readResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testHdfsTextReadWithFileSplit(TestContainer container)
+ throws IOException, InterruptedException {
+ resetSplitTestPath();
+ putHdfsSequentialLinesFile("/split/input/test.txt", 1000);
+
+ org.testcontainers.containers.Container.ExecResult readResult =
+ container.executeJob("/hdfs_text_split_to_assert.conf");
+ Assertions.assertEquals(0, readResult.getExitCode());
+ }
+
@TestTemplate
public void testHdfsReadEmptyTextDirectory(TestContainer container)
throws IOException, InterruptedException {
@@ -195,6 +217,13 @@ public class HdfsFileIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, mkdirResult.getExitCode());
}
+ private void resetSplitTestPath() throws IOException, InterruptedException
{
+ nameNode.execInContainer("bash", "-c", "hdfs dfs -rm -r -f /split ||
true");
+ org.testcontainers.containers.Container.ExecResult mkdirResult =
+ nameNode.execInContainer("hdfs", "dfs", "-mkdir", "-p",
"/split/input");
+ Assertions.assertEquals(0, mkdirResult.getExitCode());
+ }
+
private void putHdfsFile(String hdfsPath, String content)
throws IOException, InterruptedException {
String command = "printf '" + content + "' | hdfs dfs -put -f - " +
hdfsPath;
@@ -203,6 +232,18 @@ public class HdfsFileIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, putResult.getExitCode());
}
+ private void putHdfsSequentialLinesFile(String hdfsPath, int lineCount)
+ throws IOException, InterruptedException {
+ String command =
+ "i=1; while [ $i -le "
+ + lineCount
+ + " ]; do echo $i; i=$((i+1)); done | hdfs dfs -put -f
- "
+ + hdfsPath;
+ org.testcontainers.containers.Container.ExecResult putResult =
+ nameNode.execInContainer("bash", "-c", command);
+ Assertions.assertEquals(0, putResult.getExitCode());
+ }
+
private String readHdfsFile(String hdfsPath) throws IOException,
InterruptedException {
org.testcontainers.containers.Container.ExecResult catResult =
nameNode.execInContainer("hdfs", "dfs", "-cat", hdfsPath);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_parquet_split_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_parquet_split_to_assert.conf
new file mode 100644
index 0000000000..f5ac87c023
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_parquet_split_to_assert.conf
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ HdfsFile {
+ fs.defaultFS = "hdfs://namenode1:9000"
+ path = "/normal/output"
+ file_format_type = "parquet"
+ enable_file_split = true
+ file_split_size = 1024
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_float = float
+ c_double = double
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ hadoop_conf = {
+ "dfs.replication" = 1
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 100
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ]
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_text_split_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_text_split_to_assert.conf
new file mode 100644
index 0000000000..91e2c52552
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-hadoop-e2e/src/test/resources/hdfs_text_split_to_assert.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ # NOTE: Spark runs this E2E with `--master local` (single thread). The
Assert sink checks row
+ # rules per task commit, so using parallelism > 1 may validate before all
partitions finish.
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ HdfsFile {
+ fs.defaultFS = "hdfs://namenode1:9000"
+ path = "/split/input/test.txt"
+ file_format_type = "text"
+ enable_file_split = true
+ file_split_size = 20
+ schema = {
+ fields {
+ line = string
+ }
+ }
+ hadoop_conf = {
+ "dfs.replication" = 1
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 1000
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 1000
+ }
+ ]
+ }
+ }
+}