DanielCarter-stack commented on PR #10437:
URL: https://github.com/apache/seatunnel/pull/10437#issuecomment-3834496279
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10437", "part": 1,
"total": 1} -->
### Issue 1: Insufficient Unit Test Coverage - BinaryReadStrategy Path
Resolution Logic
**Location**:
-
`seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java:103-111`
**Related Context**:
- Modified method: `BinaryReadStrategy.resolveBinaryRelativePath()`
- Dependent method: `AbstractReadStrategy.resolveRelativePath()` (line 783)
- Caller: `BinaryReadStrategy.read()` (line 85)
**Problem Description**:
The newly added `resolveBinaryRelativePath()` method lacks unit test
coverage, and key logic is unverified:
1. Initialization logic when `basePathIsFile` is null
2. Logic to return filename when `basePathIsFile` is true
3. Logic to call parent class method when `basePathIsFile` is false
4. Correct handling of SFTP/FTP path formats (e.g.,
`sftp://server/path/file.txt`)
**Potential Risks**:
- **Risk 1**: Path resolution errors may cause incorrect relative path
calculations, leading to:
- update mode unable to correctly match source/destination files
- `relativePath` field value errors, affecting file write paths on the
Sink side
- **Risk 2**: For remote file systems like SFTP/FTP, path formats differ
from local ones, and edge cases may not be covered
- **Risk 3**: Lack of tests increases risk of future refactoring (e.g.,
modifying `resolveRelativePath` method may break this logic)
**Impact Scope**:
- **Direct Impact**:
- `SeaTunnelRow.relativePath` field generated by
`BinaryReadStrategy.read()` method
- All binary update mode jobs using FtpFile/SftpFile/LocalFile Connectors
- **Indirect Impact**:
- Sink side logic for writing files based on `relativePath`
- File system directory structure preservation
- **Affected Components**: Multiple Connectors (FtpFile/SftpFile/LocalFile)
**Severity**: MAJOR
**Improvement Suggestions**:
```java
// Add tests in BinaryReadStrategyTest.java (new test class required)
@Test
public void testResolveBinaryRelativePath_WhenBaseIsFile() throws
IOException {
// Mock hadoopFileSystemProxy
HadoopFileSystemProxy mockProxy = mock(HadoopFileSystemProxy.class);
when(mockProxy.isFile("/ftp/path/file.txt")).thenReturn(true);
BinaryReadStrategy strategy = new BinaryReadStrategy();
strategy.setHadoopFileSystemProxy(mockProxy);
strategy.setPluginConfig(createMockConfig("/ftp/path/file.txt"));
strategy.init(mockHadoopConf);
String relativePath =
strategy.resolveBinaryRelativePath("/ftp/path/file.txt");
assertEquals("file.txt", relativePath);
}
@Test
public void testResolveBinaryRelativePath_WhenBaseIsDirectory() throws
IOException {
HadoopFileSystemProxy mockProxy = mock(HadoopFileSystemProxy.class);
when(mockProxy.isFile("/ftp/path")).thenReturn(false);
BinaryReadStrategy strategy = new BinaryReadStrategy();
strategy.setHadoopFileSystemProxy(mockProxy);
strategy.setPluginConfig(createMockConfig("/ftp/path"));
strategy.init(mockHadoopConf);
String relativePath =
strategy.resolveBinaryRelativePath("/ftp/path/sub/file.txt");
assertEquals("sub/file.txt", relativePath);
}
@Test
public void testResolveBinaryRelativePath_SftpPath() throws IOException {
HadoopFileSystemProxy mockProxy = mock(HadoopFileSystemProxy.class);
when(mockProxy.isFile("sftp://server:22/path")).thenReturn(false);
BinaryReadStrategy strategy = new BinaryReadStrategy();
strategy.setHadoopFileSystemProxy(mockProxy);
strategy.setPluginConfig(createMockConfig("sftp://server:22/path"));
strategy.init(mockHadoopConf);
String relativePath =
strategy.resolveBinaryRelativePath("sftp://server:22/path/file.txt");
assertEquals("file.txt", relativePath);
}
```
**Rationale**:
- These tests can verify path resolution logic correctness in file vs
directory, local vs remote path scenarios
- Prevent breaking this logic during future refactoring
- Improve code maintainability and reliability
---
### Issue 2: Missing Critical Logging - Update Mode Runtime Status
**Location**:
-
`seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java:676-739`
**Related Context**:
- Modified method: `AbstractReadStrategy.shouldSyncFileInUpdateMode()`
- Caller: `AbstractReadStrategy.getFileNames()` (line 168, 176)
**Problem Description**:
Key runtime status of update mode lacks logging, causing:
1. Users cannot confirm whether update mode is correctly enabled
2. Unable to understand file filtering statistics (how many files scanned,
how many skipped)
3. Lack of debugging information when file comparison fails
4. Unable to monitor update mode performance overhead
**Potential Risks**:
- **Risk 1**: When users misconfigure (e.g., `target_path` configuration
error), problems cannot be quickly located
- **Risk 2**: In scenarios with large numbers of files, unable to determine
whether update mode actually saves transfer overhead
- **Risk 3**: Difficult troubleshooting in production environments (e.g.,
some files not skipped as expected)
**Impact Scope**:
- **Direct Impact**:
- All users using sync_mode=update
- Operations observability
- **Indirect Impact**:
- Troubleshooting efficiency
- User confidence in update mode
- **Affected Components**: Multiple Connectors
(HDFS/FtpFile/SftpFile/LocalFile/OSS/OBS/S3/COS/Jindo-OSS)
**Severity**: MAJOR
**Improvement Suggestions**:
```java
// Add logging in AbstractReadStrategy.init() method
if (enableUpdateSync) {
log.info(
"Update sync mode enabled: sourceRootPath={}, targetPath={},
updateStrategy={}, compareMode={}",
sourceRootPath,
targetPath,
updateStrategy,
compareMode);
}
// Add statistics in AbstractReadStrategy.getFileNames() method
private int totalFilesScanned = 0;
private int totalFilesFiltered = 0;
private boolean shouldSyncFileInUpdateMode(FileStatus sourceFileStatus)
throws IOException {
totalFilesScanned++;
if (!enableUpdateSync) {
return true;
}
// ... existing logic ...
boolean shouldSync = /* 现有判断逻辑 */;
if (!shouldSync) {
totalFilesFiltered++;
log.debug("File skipped by update mode: sourceFile={},
targetFile={}, reason={}",
sourceFilePath, targetFilePath, "already up-to-date");
}
return shouldSync;
}
// Add logging at the end of getFileNames() method
if (enableUpdateSync) {
log.info("Update sync mode statistics: scanned={}, filtered={},
synced={}",
totalFilesScanned, totalFilesFiltered, totalFilesScanned -
totalFilesFiltered);
}
```
**Rationale**:
- INFO level logs let users confirm update mode is enabled and configured
- DEBUG level logs provide detailed file filtering information (for
debugging)
- Statistics logs help users evaluate update mode effectiveness
- Aligns with SeaTunnel observability best practices
---
### Issue 3: E2E Tests Do Not Cover STRICT Strategy and CHECKSUM Comparison
Mode
**Location**:
-
`seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java:426-451`
**Related Context**:
- Existing tests: `testLocalFileBinaryUpdateModeDistcp()` (only tests DISTCP
strategy)
- Test configuration: `local_file_binary_update_distcp.conf`
**Problem Description**:
E2E tests only cover the combination of `update_strategy=distcp` and
`compare_mode=len_mtime`, not testing:
1. `update_strategy=strict` + `compare_mode=len_mtime`: Strict comparison
mode (length and modification time must be exactly equal)
2. `update_strategy=strict` + `compare_mode=checksum`: Checksum-based
comparison (most strict)
3. Error scenarios: e.g., validation of `compare_mode=checksum` +
`update_strategy=distcp` (should be rejected during configuration validation)
**Potential Risks**:
- **Risk 1**: STRICT strategy logic errors not discovered (e.g., reversed
comparison logic)
- **Risk 2**: CHECKSUM comparison mode implementation issues not exposed
(e.g., some file systems don't support checksum)
- **Risk 3**: Configuration validation logic omissions (e.g., illegal
combinations not rejected)
**Impact Scope**:
- **Direct Impact**:
- Users needing strict comparison mode (high data consistency requirements)
- Users using CHECKSUM comparison (need more precise change detection)
- **Indirect Impact**:
- User confidence in update mode
- Production data consistency risks
- **Affected Components**: All Connectors supporting update mode
**Severity**: MINOR (core DISTCP strategy is tested, but supplementation
recommended)
**Improvement Suggestions**:
```java
// Add tests in LocalFileIT.java
@TestTemplate
@DisabledOnContainer(
value = {},
type = {EngineType.FLINK, EngineType.SPARK},
disabledReason = "sync_mode=update needs same filesystem")
public void testLocalFileBinaryUpdateModeStrict()
throws IOException, InterruptedException {
resetUpdateTestPath();
putLocalFile("/tmp/seatunnel/update/src/test.bin", "abc");
TestHelper helper = new TestHelper(container);
helper.execute("/binary/local_file_binary_update_strict.conf");
Assertions.assertEquals("abc",
readLocalFile("/tmp/seatunnel/update/dst/test.bin"));
// STRICT + LEN_MTIME: make target with same length and mtime, should
SKIP
putLocalFile("/tmp/seatunnel/update/dst/test.bin", "zzz");
// Wait 1 second to ensure different modification times
Thread.sleep(1000);
putLocalFile("/tmp/seatunnel/update/src/test.bin", "abc"); // Restore
original content
helper.execute("/binary/local_file_binary_update_strict.conf");
// In STRICT mode, even if content is the same, different modification
times will trigger overwrite
Assertions.assertEquals("abc",
readLocalFile("/tmp/seatunnel/update/dst/test.bin"));
baseContainer.execInContainer("sh", "-c", "rm -rf
/tmp/seatunnel/update");
}
```
**Rationale**:
- Covering STRICT strategy validates strict comparison logic correctness
- Prevents breaking STRICT strategy behavior during future modifications
- Provides complete test coverage, improving user confidence
---
### Issue 4: Documentation Does Not Explain Performance Impact and
Limitations
**Location**:
- `docs/en/connectors/source/FtpFile.md:442-457`
- `docs/en/connectors/source/SftpFile.md:445-460`
- `docs/en/connectors/source/LocalFile.md:442-457`
**Related Context**:
- New documentation section: `### sync_mode [string]` and related
configuration descriptions
**Problem Description**:
Documentation does not explain the following key information about update
mode:
1. **Performance Impact**: Each source file requires additional RPC calls
(`getFileStatus`) to get destination file status
2. **Applicable Scenarios**: Suitable for scenarios with moderate file
counts and high duplication rates; not suitable for massive small file scenarios
3. **File System Requirements**: Source/destination file systems need to
support efficient `getFileStatus` operations
4. **Time Synchronization Requirements**: DISTCP strategy depends on time
synchronization between source/destination file systems
5. **CHECKSUM Limitations**: Some file systems (like FTP) may not support
checksum operations
**Potential Risks**:
- **Risk 1**: Users using update mode in massive file scenarios (e.g.,
millions of files), causing serious performance problems
- **Risk 2**: File system time not synchronized causing DISTCP strategy
judgment errors (e.g., source file time behind destination)
- **Risk 3**: Users misunderstand CHECKSUM mode availability (FTP not
supported)
**Impact Scope**:
- **Direct Impact**:
- All users planning to use sync_mode=update
- Production environment performance and stability
- **Indirect Impact**:
- User support costs (tickets due to performance issues)
- User satisfaction with SeaTunnel
- **Affected Components**: All Connectors using update mode
**Severity**: MINOR (documentation issue, does not affect functionality)
**Improvement Suggestions**:
Add the following content to documentation:
```markdown
### sync_mode [string]
File sync mode. Supported values: `full` (default), `update`.
When `update`, the source compares files between source/target and only
reads new/changed files (currently only supports `file_format_type=binary`).
**Performance Considerations:**
- Update mode requires an additional `getFileStatus` RPC call for each
source file to compare with the target.
- For remote file systems (FTP/SFTP), this adds network overhead per file.
- Recommended for scenarios with: moderate file count (thousands to hundreds
of thousands), high duplicate rate.
- Not recommended for: massive small files (millions), low-latency
requirements.
**Requirements:**
- Source and target filesystem clocks should be synchronized when using
`update_strategy=distcp`.
- `compare_mode=checksum` requires filesystem to support `getFileChecksum`
(FTP/SFTP may not support).
- `target_path` should typically align with sink `path` (same filesystem and
relative paths).
```
**Rationale**:
- Clarifying performance impact helps users choose appropriate mode
- Explaining limitations prevents misuse
- Provides best practice recommendations
---
### Issue 5: Potential Race Condition for basePathIsFile Field in Concurrent
Scenarios
**Location**:
-
`seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java:49,
104-106`
**Related Context**:
- Field definition: `private transient Boolean basePathIsFile;`
- Usage location: `resolveBinaryRelativePath()` method
**Problem Description**:
`basePathIsFile` field uses lazy initialization pattern, but theoretically
has a race condition:
```java
if (basePathIsFile == null) {
basePathIsFile = hadoopFileSystemProxy.isFile(basePath);
}
```
Although SeaTunnel's Source Reader is a single-threaded model, if
multi-threaded reading or concurrent initialization is introduced in the
future, it could lead to:
- Multiple threads simultaneously detecting `basePathIsFile == null`
- Multiple calls to `hadoopFileSystemProxy.isFile(basePath)`
- No effect if results are consistent, but if `basePath` state changes
between two calls, unpredictable behavior may occur
**Potential Risks**:
- **Risk 1**: Race conditions may occur in future multi-threaded scenarios
(low probability)
- **Risk 2**: Code review will flag this as a concurrency safety issue
- **Risk 3**: If `basePathIsFile` is serialized/deserialized, `transient`
will cause re-initialization (but this is expected behavior)
**Impact Scope**:
- **Direct Impact**: `BinaryReadStrategy.resolveBinaryRelativePath()` method
- **Indirect Impact**: Future multi-threaded reading scenarios (if
introduced)
- **Affected Components**: Only BinaryReadStrategy
**Severity**: MINOR (no actual risk in current environment, but code not
robust enough)
**Improvement Suggestions**:
Solution 1: Pre-initialize in `init()` method (recommended)
```java
@Override
public void init(HadoopConf conf) {
super.init(conf);
basePath = pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key());
// Determine basePath type during initialization
try {
basePathIsFile = hadoopFileSystemProxy.isFile(basePath);
} catch (IOException e) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
"Failed to determine if base path is a file: " + basePath,
e);
}
// ... other initialization logic ...
}
// Simplify resolveBinaryRelativePath method
private String resolveBinaryRelativePath(String filePath) {
if (Boolean.TRUE.equals(basePathIsFile)) {
return new Path(filePath).getName();
}
return resolveRelativePath(basePath, filePath);
}
```
Solution 2: If keeping lazy initialization, add comment explaining
```java
// Lazily initialized to avoid unnecessary RPC call.
// Thread-safe: Source Reader is single-threaded, and this field is
transient.
private transient Boolean basePathIsFile;
```
**Rationale**:
- Solution 1 is more robust, avoiding potential race conditions
- Solution 1 exposes errors during initialization phase (rather than first
read)
- Solution 2 keeps current implementation but adds comments to reduce
misreading risk
- Solution 1 recommended
---
### Issue 6: Reduced Encapsulation After Changing resolveRelativePath Method
to Protected
**Location**:
-
`seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java:783`
**Related Context**:
- Modified method: `AbstractReadStrategy.resolveRelativePath()` (private →
protected)
- Caller: `BinaryReadStrategy.resolveBinaryRelativePath()` (line 110)
**Problem Description**:
Changing `resolveRelativePath` from `private` to `protected` allows all
subclasses to access this method, reducing encapsulation:
- Currently only `BinaryReadStrategy` needs to use this method
- Other 8 subclasses (CsvReadStrategy, JsonReadStrategy, etc.) do not need
and should not use this method
- If other subclasses misuse this method, unexpected path resolution
behavior may occur
**Potential Risks**:
- **Risk 1**: Future developers may misuse this method in subclasses (e.g.,
`TextReadStrategy` calling it to resolve paths)
- **Risk 2**: If `resolveRelativePath` implementation logic changes, may
affect multiple subclasses
- **Risk 3**: Increased code maintenance complexity (need to check this
method usage across multiple subclasses)
**Impact Scope**:
- **Direct Impact**: All subclasses of `AbstractReadStrategy` (9 total)
- **Indirect Impact**: Future code maintenance
- **Affected Components**: connector-file-base module
**Severity**: MINOR (design trade-off, acceptable)
**Improvement Suggestions**:
Solution 1: Keep protected, but add JavaDoc explanation (recommended)
```java
/**
* Resolves relative path from a base path to a full file path.
*
* <p><b>NOTE:</b> This method is intended for internal use by specific read
strategies
* (e.g., {@link BinaryReadStrategy}) that need custom path resolution logic.
* Other subclasses should not rely on this method as it may change without
notice.
*
* @param basePath the base directory path
* @param fullFilePath the full file path
* @return the relative path from base to file
*/
protected static String resolveRelativePath(String basePath, String
fullFilePath) {
// ... existing implementation ...
}
```
Solution 2: Create dedicated protected method for BinaryReadStrategy use
```java
// In AbstractReadStrategy
private static String resolveRelativePath(String basePath, String
fullFilePath) {
// Keep existing implementation
}
// Add new method specifically for BinaryReadStrategy
protected static String resolveRelativePathForBinary(String basePath, String
fullFilePath) {
return resolveRelativePath(basePath, fullFilePath);
}
// In BinaryReadStrategy
private String resolveBinaryRelativePath(String filePath) throws IOException
{
if (basePathIsFile == null) {
basePathIsFile = hadoopFileSystemProxy.isFile(basePath);
}
if (Boolean.TRUE.equals(basePathIsFile)) {
return new Path(filePath).getName();
}
return resolveRelativePathForBinary(basePath, filePath);
}
```
**Rationale**:
- Solution 1 minimizes changes, reducing misuse risk through documentation
- Solution 2 maintains better encapsulation but increases code complexity
- Considering current PR scope, Solution 1 is recommended
- If multiple subclasses need this method in the future, consider
refactoring to a common utility class
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]