yzeng1618 commented on PR #10332:
URL: https://github.com/apache/seatunnel/pull/10332#issuecomment-3799270958
> ### Issue 1: Binary Compatibility Break - Removal of
LocalFileAccordingToSplitSizeSplitStrategy
> **Location**:
>
> *
`seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/split/LocalFileAccordingToSplitSizeSplitStrategy.java`
(deleted)
>
> **Related Context**:
>
> * Original class: `LocalFileAccordingToSplitSizeSplitStrategy` extends
`AccordingToSplitSizeSplitStrategy`
> * Replaced by: Direct use of `AccordingToSplitSizeSplitStrategy` +
`LocalFileHadoopConfig`
>
> **Problem Description**: The PR deleted the
`LocalFileAccordingToSplitSizeSplitStrategy` class. If external users or
third-party Connectors inherit from this class, compilation errors or runtime
`ClassNotFoundException` will occur after upgrade.
>
> **Potential Risks**:
>
> * **Risk 1**: External inheritors will fail to compile (if source
dependency)
> * **Risk 2**: Third-party Connectors compiled with old versions will have
runtime `ClassNotFoundException` (if binary dependency)
> * **Risk 3**: Although File Connector is an internal module, there is no
guarantee that users haven't extended based on it
>
> **Impact Scope**:
>
> * **Direct Impact**: None (no inheritance within SeaTunnel)
> * **Indirect Impact**: Possible third-party Connectors
> * **Affected Area**: LocalFile Connector
>
> **Severity**: MINOR
>
> **Improvement Suggestions**:
>
> ```java
> // Keep old class as deprecated adapter
> @Deprecated
> public class LocalFileAccordingToSplitSizeSplitStrategy extends
AccordingToSplitSizeSplitStrategy {
> public LocalFileAccordingToSplitSizeSplitStrategy(
> String rowDelimiter, long skipHeaderRowNumber, String
encodingName, long splitSize) {
> super(new LocalFileHadoopConfig(), rowDelimiter,
skipHeaderRowNumber, encodingName, splitSize);
> }
> }
> ```
>
> **Rationale**: Keep one version as a deprecated adapter to give users
migration time, and completely remove it in version 2.8.0.
>
> ### Issue 2: Boundary Condition Defect - delimiterBytes.length == 0 causes
seek failure
> **Location**: `AccordingToSplitSizeSplitStrategy.java:207-209`
>
> ```java
> long scanStart = Math.max(0, startPos - (delimiterBytes.length - 1));
> input.seek(scanStart);
> ```
>
> **Related Context**:
>
> * Constructor validation: `if (rowDelimiter == null ||
rowDelimiter.isEmpty())` guards against null, but not against length 0
> * Actually `"".isEmpty() == true`, so the constructor will intercept, but
this is an implicit dependency
>
> **Problem Description**: Although the constructor will intercept
`rowDelimiter.isEmpty()`, the algorithm in `findNextDelimiterWithSeek` assumes
`delimiterBytes.length >= 1`. If someone modifies constructor validation logic
in the future (e.g., allowing empty delimiter to indicate no splitting), a bug
will occur here.
>
> A more serious problem is: when `delimiterBytes.length == 1`, `scanStart =
startPos - 0 = startPos`, the algorithm is correct; but when future support for
multi-byte delimiter is added (already supported), the algorithm assumes
`scanStart` might be less than `startPos` to capture cross-boundary delimiters.
>
> **Potential Risks**:
>
> * **Risk 1**: If constructor validation is bypassed (e.g., reflection,
deserialization attack), `delimiterBytes.length == 0` will cause `scanStart =
startPos - (-1) = startPos + 1`, seek exceeds file range
> * **Risk 2**: Code maintainers may not understand why `length - 1`,
mistakenly changing it to other values
>
> **Impact Scope**:
>
> * **Direct Impact**:
`AccordingToSplitSizeSplitStrategy.findNextDelimiterWithSeek()`
> * **Indirect Impact**: All scenarios using text/csv/json splitting
> * **Affected Area**: HdfsFile, LocalFile
>
> **Severity**: MINOR
>
> **Improvement Suggestions**:
>
> ```java
> private long findNextDelimiterWithSeek(FSDataInputStream input, long
startPos, long fileSize)
> throws IOException {
> // Explicit assertion to clarify algorithm assumptions
> if (delimiterBytes.length == 0) {
> throw new IllegalStateException("delimiterBytes must not be
empty");
> }
>
> // Extract magic numbers as constants
> long scanStart = Math.max(0, startPos - (delimiterBytes.length - 1));
> input.seek(scanStart);
> // ...
> }
> ```
>
> **Rationale**: Add defensive assertions to make algorithm assumptions
explicit even though the constructor has validated, facilitating future
maintenance.
>
> ### Issue 3: API Compatibility - AccordingToSplitSizeSplitStrategy changed
from abstract to concrete
> **Location**: `AccordingToSplitSizeSplitStrategy.java:50`
>
> ```java
> // Old version
> public abstract class AccordingToSplitSizeSplitStrategy implements
FileSplitStrategy {
> protected abstract InputStream getInputStream(String filePath) throws
IOException;
> protected abstract long getFileSize(String filePath) throws
IOException;
> }
>
> // New version
> public class AccordingToSplitSizeSplitStrategy implements
FileSplitStrategy, Closeable {
> public AccordingToSplitSizeSplitStrategy(HadoopConf hadoopConf, ...) {
> this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
> }
> }
> ```
>
> **Related Context**:
>
> * Subclass: `LocalFileAccordingToSplitSizeSplitStrategy` (deleted)
> * No other known subclasses
>
> **Problem Description**: `AccordingToSplitSizeSplitStrategy` changed from
abstract class to concrete class, and constructor signature changed from
`(String, long, String, long)` to `(HadoopConf, String, long, String, long)`.
This is a breaking change.
>
> Although there are no other subclasses within SeaTunnel, if external
extensions inherit from this class, they will:
>
> 1. Compilation error: unable to instantiate parent class
> 2. Abstract methods `getInputStream()`/`getFileSize()` disappear, subclass
compilation fails
>
> **Potential Risks**:
>
> * **Risk 1**: Third-party Connectors inheriting this class will fail to
compile
> * **Risk 2**: In a plugin architecture, user-defined File Sources may be
affected
>
> **Impact Scope**:
>
> * **Direct Impact**: All classes inheriting
`AccordingToSplitSizeSplitStrategy` (none internally)
> * **Indirect Impact**: Possible third-party extensions
> * **Affected Area**: File Connector ecosystem
>
> **Severity**: MINOR
>
> **Improvement Suggestions**: Option 1 (Recommended): Explicitly mark
Breaking Changes in release notes, requiring users upgrading to 2.7.x to check
if they inherit this class.
>
> Option 2: Keep abstract base class, add concrete implementation
>
> ```java
> // Keep old abstract class (deprecated)
> @Deprecated
> public abstract class LegacyAccordingToSplitSizeSplitStrategy implements
FileSplitStrategy {
> // Old abstract method
> protected abstract InputStream getInputStream(String filePath) throws
IOException;
> protected abstract long getFileSize(String filePath) throws
IOException;
> }
>
> // New concrete implementation
> public class AccordingToSplitSizeSplitStrategy implements
FileSplitStrategy, Closeable {
> // New implementation
> }
> ```
>
> **Rationale**: Although there are no internal subclasses currently, as a
framework library, API stability should be maintained, at least marking
`@Deprecated` to provide a transition period.
>
> ### Issue 4: Code Duplication - HdfsFileSource and LocalFileSource have
identical initialization logic
> **Location**:
>
> * `HdfsFileSource.java:36-48` (newly added)
> * `LocalFileSource.java:35-47` (modified)
>
> **Related Context**:
>
> ```java
> // HdfsFileSource.java
> private static FileSplitStrategy
initFileSplitStrategy(MultipleTableHdfsFileSourceConfig config) {
> Map<String, FileSplitStrategy> splitStrategies = new HashMap<>();
> for (BaseFileSourceConfig fileSourceConfig :
config.getFileSourceConfigs()) {
> String tableId =
fileSourceConfig.getCatalogTable().getTableId().toTablePath().toString();
> splitStrategies.put(tableId,
> FileSplitStrategyFactory.initFileSplitStrategy(
> fileSourceConfig.getBaseFileSourceConfig(),
> fileSourceConfig.getHadoopConfig()));
> }
> return new MultipleTableFileSplitStrategy(splitStrategies);
> }
>
> // LocalFileSource.java - Exactly the same!
> private static FileSplitStrategy
initFileSplitStrategy(MultipleTableLocalFileSourceConfig sourceConfig) {
> Map<String, FileSplitStrategy> splitStrategies = new HashMap<>();
> for (BaseFileSourceConfig fileSourceConfig :
sourceConfig.getFileSourceConfigs()) {
> String tableId =
fileSourceConfig.getCatalogTable().getTableId().toTablePath().toString();
> splitStrategies.put(tableId,
> FileSplitStrategyFactory.initFileSplitStrategy(
> fileSourceConfig.getBaseFileSourceConfig(),
> fileSourceConfig.getHadoopConfig()));
> }
> return new MultipleTableFileSplitStrategy(splitStrategies);
> }
> ```
>
> **Problem Description**: The `initFileSplitStrategy()` method logic in
both classes is completely identical, violating the DRY (Don't Repeat Yourself)
principle. If initialization logic needs to be modified in the future (e.g.,
adding caching, logging), two places need to be modified simultaneously.
>
> **Potential Risks**:
>
> * **Risk 1**: Increased maintenance cost (synchronized modifications to
two places)
> * **Risk 2**: Easy to create inconsistencies (e.g., modifying only one
place, forgetting the other)
>
> **Impact Scope**:
>
> * **Direct Impact**: `HdfsFileSource`, `LocalFileSource`
> * **Indirect Impact**: Newly added FileSources in the future (e.g.,
S3FileSource)
> * **Affected Area**: All File Sources
>
> **Severity**: MINOR
>
> **Improvement Suggestions**:
>
> ```java
> // BaseMultipleTableFileSource.java
> public abstract class BaseMultipleTableFileSource {
> protected static FileSplitStrategy initFileSplitStrategy(
> BaseMultipleTableFileSourceConfig sourceConfig) {
> Map<String, FileSplitStrategy> splitStrategies = new HashMap<>();
> for (BaseFileSourceConfig fileSourceConfig :
sourceConfig.getFileSourceConfigs()) {
> String tableId =
fileSourceConfig.getCatalogTable().getTableId().toTablePath().toString();
> splitStrategies.put(tableId,
> FileSplitStrategyFactory.initFileSplitStrategy(
> fileSourceConfig.getBaseFileSourceConfig(),
> fileSourceConfig.getHadoopConfig()));
> }
> return new MultipleTableFileSplitStrategy(splitStrategies);
> }
> }
>
> // HdfsFileSource.java
> private HdfsFileSource(MultipleTableHdfsFileSourceConfig sourceConfig) {
> super(sourceConfig, initFileSplitStrategy(sourceConfig)); // Call
parent class method
> }
>
> // LocalFileSource.java
> private LocalFileSource(MultipleTableLocalFileSourceConfig sourceConfig) {
> super(sourceConfig, initFileSplitStrategy(sourceConfig)); // Call
parent class method
> }
> ```
>
> **Rationale**: Extract common logic to parent class to reduce duplication
and improve maintainability.
>
> ### Issue 6: Incomplete Documentation - Missing performance
recommendations and limitation notes
> **Location**: `docs/en/connectors/source/HdfsFile.md:256-265`
>
> **Related Context**: Current documentation:
>
> ```
> ### enable_file_split [boolean]
> Turn on the file splitting function, the default is false. ...
>
> ### file_split_size [long]
> File split size, which can be filled in when the enable_file_split
parameter is true. ...
> The default value is the number of bytes of 128MB, which is 134217728.
> ```
>
> **Problem Description**: The documentation is missing the following key
information:
>
> 1. **Performance Recommendations**: In which scenarios should it be
enabled? In which scenarios should it not be?
> 2. **Limitation Notes**: Maximum split number limit? Memory overhead?
> 3. **Tuning Guide**: How to choose an appropriate `file_split_size`?
> 4. **Known Issues**: For example, compressed files, archive files do not
support splitting
>
> **Potential Risks**:
>
> * **Risk 1**: Users abuse the feature (e.g., enabling splitting for small
files) resulting in performance degradation
> * **Risk 2**: Users set too small split_size (e.g., 1KB) causing OOM
> * **Risk 3**: Users expect compressed files to also be splittable,
actually not supported
>
> **Impact Scope**:
>
> * **Direct Impact**: User experience
> * **Indirect Impact**: Increased support cost
> * **Affected Area**: HdfsFile, LocalFile
>
> **Severity**: MINOR
>
> **Improvement Suggestions**:
>
> ```
> ### enable_file_split [boolean]
> Turn on the file splitting function, the default is false. ...
>
> **Performance Recommendations:**
> - **Enable when**: Reading few large files (> 1GB each) with parallelism >
4
> - **Disable when**: Reading many small files (< 128MB each) or parallelism
= 1
> - **Not supported for**: Compressed files (gzip, bzip2), archive files
(zip, tar)
>
> **Limitations:**
> - Maximum splits per file: 100,000 (configurable via
`seaunnel.source.file.split.max-count`)
> - Memory overhead: ~100 bytes per split
>
> ### file_split_size [long]
> File split size, which can be filled in when the enable_file_split
parameter is true. ...
>
> **Tuning Guidelines:**
> - **Recommended**: 128MB - 512MB for HDFS, 64MB - 256MB for local file
> - **Too small (< 16MB)**: Excessive splits, high overhead
> - **Too large (> 1GB)**: Insufficient parallelism
> - **Formula**: `file_split_size = total_file_size / desired_parallelism`
> ```
>
> **Rationale**: Add performance guidelines and limitation notes to help
users use the feature correctly.
>
> ### Issue 7: Architectural Complexity - LocalFile requiring HadoopConf
configuration concept overflow
> **Location**: `LocalFileSource.java:39-42`
>
> ```java
> splitStrategies.put(tableId,
> FileSplitStrategyFactory.initFileSplitStrategy(
> fileSourceConfig.getBaseFileSourceConfig(),
> fileSourceConfig.getHadoopConfig())); // ← Does LocalFile need
HadoopConf?
> ```
>
> **Related Context**:
>
> * `LocalFileHadoopConfig`: LocalFile's HadoopConf implementation (actually
just configuring `file:///`)
> * `AccordingToSplitSizeSplitStrategy`: requires `HadoopConf` to create
`HadoopFileSystemProxy`
>
> **Problem Description**: To reuse `AccordingToSplitSizeSplitStrategy`,
LocalFile scenarios are also forced to introduce the `HadoopConf` concept.
Although technically feasible (through `LocalFileHadoopConfig`), it is
conceptually unnatural:
>
> * LocalFile users typically don't care about Hadoop configuration
> * Error messages may contain "Hadoop" wording, confusing LocalFile users
>
> **Potential Risks**:
>
> * **Risk 1**: User confusion ("Why do I need Hadoop configuration to read
local files?")
> * **Risk 2**: Misleading error messages (e.g., "Hadoop configuration
error")
>
> **Impact Scope**:
>
> * **Direct Impact**: LocalFile Source user experience
> * **Indirect Impact**: Documentation complexity
> * **Affected Area**: LocalFile Connector
>
> **Severity**: MINOR
>
> **Improvement Suggestions**: Option 1 (Recommended): Explicitly state in
documentation
>
> ```
> For **LocalFile**, the `hadoop_conf` is optional and defaults to
`file:///` schema.
> You rarely need to configure it unless you have custom Hadoop settings.
> ```
>
> Option 2 (Complex): Abstract `FileSystemProxy` interface
>
> ```java
> public interface FileSystemProxy {
> FSDataInputStream getInputStream(String path) throws IOException;
> FileStatus getFileStatus(String path) throws IOException;
> }
>
> public class HadoopFileSystemProxy implements FileSystemProxy { ... }
> public class LocalFileSystemProxy implements FileSystemProxy { ... } //
Pure Java NIO
> ```
>
> **Rationale**: Option 1 is simple and effective, Option 2 has clearer
architecture but requires more work. The current solution is acceptable.
>
> ### Issue 8: Checkpoint Compatibility Not Fully Validated
> **Location**: `FileSourceSplit.java:44-49`
>
> ```java
> public FileSourceSplit(String tableId, String filePath, long start, long
length) {
> this.tableId = tableId;
> this.filePath = filePath;
> this.start = start; // ← New field assigned in constructor
> this.length = length; // ← New field assigned in constructor
> }
> ```
>
> **Related Context**:
>
> * `serialVersionUID = 1L`
> * Old version `FileSourceSplit` may only have `tableId`/`filePath`,
`start`/`length` use default values 0/-1
>
> **Problem Description**: The PR did not validate the behavior of restoring
from old version (without file split feature) checkpoints to the new version:
>
> 1. After deserializing `FileSourceSplit` from old checkpoint, `start=0,
length=-1`
> 2. New code will read by byte range, `length=-1` may cause problems
> 3. Need to check if `AbstractReadStrategy.actualRead()` correctly handles
`length=-1`
>
> **Validation**: Check `AbstractReadStrategy.safeSlice()`:
>
> ```java
> protected static InputStream safeSlice(InputStream in, long start, long
length) throws IOException {
> // ...
> return new BoundedInputStream(in, length); // How does
BoundedInputStream handle length=-1?
> }
> ```
>
> Apache Commons IO's `BoundedInputStream(-1)` will read to the end of the
stream, so the behavior is correct. But this is an implicit dependency.
>
> **Potential Risks**:
>
> * **Risk 1**: Jobs upgrading from 2.6.x to 2.7.x, after checkpoint
recovery, data inconsistency
> * **Risk 2**: The semantics of `length=-1` are not clear enough (does it
mean "read to end of file" or "disable split"?)
>
> **Impact Scope**:
>
> * **Direct Impact**: Job recovery for upgrading users
> * **Indirect Impact**: Data consistency
> * **Affected Area**: All jobs using File Source
>
> **Severity**: MAJOR
>
> **Improvement Suggestions**:
>
> ```java
> // FileSourceSplit.java
> public FileSourceSplit(String tableId, String filePath, long start, long
length) {
> this.tableId = tableId;
> this.filePath = filePath;
> this.start = start;
> // Clarify semantics: -1 means "entire file" (compatible with old
version)
> this.length = length;
> }
>
> // AbstractReadStrategy.java
> protected static InputStream safeSlice(InputStream in, long start, long
length) throws IOException {
> if (start > 0) {
> if (in instanceof Seekable) {
> ((Seekable) in).seek(start);
> } else {
> // ...
> }
> }
> // length=-1 means unlimited (compatible with old checkpoint)
> if (length < 0) {
> return in;
> }
> return new BoundedInputStream(in, length);
> }
> ```
>
> Also add tests:
>
> ```java
> @Test
> void testDeserializeFromOldCheckpoint() {
> // Simulate FileSourceSplit from old version serialization
> FileSourceSplit oldSplit = new FileSourceSplit("table", "path"); //
start=0, length=-1
> // Verify reading entire file
> List<String> records = readBySplit(oldSplit);
> Assertions.assertEquals(fullFileRecords, records);
> }
> ```
>
> **Rationale**: Clarify the semantics of `length=-1`, add tests to verify
upgrade compatibility.
The above suggestions have been implemented.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]