DanielCarter-stack commented on PR #10473:
URL: https://github.com/apache/seatunnel/pull/10473#issuecomment-3873152255
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10473", "part": 1,
"total": 1} -->
### Issue 1: ConfigUtil.convertToDuration() format preference may be
counterintuitive
**Location**:
`seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util/ConfigUtil.java:191-196`
```java
// Prefer ISO-8601 duration format first, e.g. PT10S.
try {
return Duration.parse(value);
} catch (Exception ignored) {
// Try shorthand format next.
}
```
**Related Context**:
- Callers: All configurations using `Option.durationType()`
- Example: `FileBaseSourceOptions.java` defined in lines 42-47 as
`SCAN_INTERVAL`
**Problem Description**:
The code comment says "prefer ISO-8601 format", but the actual behavior is:
if the user inputs `"10S"`, `Duration.parse("10S")` will throw an exception
(because ISO-8601 requires `PT10S`), then it will try the shorthand format.
This means:
1. When the user inputs `10S`, an exception is first generated (although
ignored)
2. If the user inputs `PT10S`, it will be processed by the ISO-8601 parser
3. But the documentation and examples all recommend using `10S` format
**Potential Risks**:
- Exception handling brings minor performance overhead
- If logging is added in exception handling (some frameworks do this), it
will generate a lot of noise
- Users see "shorthand format recommended" but code "prioritizes ISO-8601",
causing confusion
**Impact Scope**:
- Direct impact: All File Source users using `scan_interval`
- Indirect impact: Other Connectors that may use Duration type in the future
**Severity**: MINOR
**Improvement Suggestions**:
```java
// Shorthand format is recommended and checked first for better performance.
String normalized = value.replaceAll("\\s+", "").toUpperCase(Locale.ROOT);
try {
if (normalized.endsWith("MS")) {
return Duration.ofMillis(Long.parseLong(normalized.substring(0,
normalized.length() - 2)));
} else if (normalized.endsWith("S")) {
return Duration.ofSeconds(Long.parseLong(normalized.substring(0,
normalized.length() - 1)));
} else if (normalized.endsWith("M")) {
return Duration.ofMinutes(Long.parseLong(normalized.substring(0,
normalized.length() - 1)));
} else if (normalized.endsWith("H")) {
return Duration.ofHours(Long.parseLong(normalized.substring(0,
normalized.length() - 1)));
} else if (normalized.endsWith("D")) {
return Duration.ofDays(Long.parseLong(normalized.substring(0,
normalized.length() - 1)));
}
} catch (NumberFormatException e) {
// Fall through to ISO-8601 parsing for a more descriptive error message.
}
// Try ISO-8601 duration format (e.g. PT10S) as fallback.
try {
return Duration.parse(value);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(
"Could not parse duration value '%s'. Supported formats:
shorthand (e.g. 10S, 500MS) or ISO-8601 (e.g. PT10S).",
value),
e);
}
```
**Rationale**:
1. Since the documentation recommends shorthand format, the code should
prioritize processing it
2. Avoid unnecessary exception handling
3. Keep code behavior consistent with documentation
---
### Issue 2: LATEST mode boundary behavior may lead to file loss
**Location**:
`seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ContinuousMultipleTableFileSourceSplitEnumerator.java:474-477`
```java
if (startMode == FileStartMode.LATEST
&& sourceFileStatus.getModificationTime() <= baselineStartMillis) {
return false;
}
```
**Related Context**:
- Caller: `scanOnce()` in line 263 `shouldProcess()`
- Related configuration: `FileBaseSourceOptions.START_MODE` (lines 49-57)
- Related documentation: LocalFile.md lines 434-439
**Problem Description**:
When `startMode=LATEST`, the code skips files with modification time `<=
baselineStartMillis`. This means:
- If a file is modified at the moment the job starts
(`baselineStartMillis`), it will be skipped
- This semantic is consistent with the documentation description "only
process files modified after the job starts"
- **However**: The documentation says "after", which usually means strictly
greater than (`>`), but the code uses `<=` (skips less than or equal to)
Actually, the logic here is correct (skipping `<=` is equivalent to only
processing `>`), but the issue is:
1. **Clock precision issue**: Clocks on different machines may not be
synchronized, and there may be millisecond-level differences between the source
file system and SeaTunnel Engine
2. **High-frequency modification scenario**: If files are modified at high
frequency, files that happen to fall on the time boundary may be missed
**Potential Risks**:
- In high-frequency file update scenarios, certain files may be permanently
skipped
- If the source file system clock is faster than the Engine clock, new files
may be incorrectly skipped
**Impact Scope**:
- Direct impact: Users using `discovery_mode=CONTINUOUS` + `startMode=LATEST`
- Impact surface: All 4 File connectors
**Severity**: MAJOR
**Improvement Suggestions**:
```java
// Add a small buffer (e.g., 1 second) to account for clock skew
private static final long LATEST_MODE_BUFFER_MILLIS = 1000L;
private boolean shouldProcess(
FileStatus sourceFileStatus, long baselineStartMillis, FileStartMode
startMode)
throws IOException {
if (startMode == FileStartMode.LATEST) {
// Use a buffer to account for clock skew between filesystem and
engine
long effectiveThreshold = baselineStartMillis -
LATEST_MODE_BUFFER_MILLIS;
if (sourceFileStatus.getModificationTime() <= effectiveThreshold) {
return false;
}
}
return shouldSyncInUpdateMode(sourceFileStatus);
}
```
**Rationale**:
1. Adding a small buffer can tolerate clock desynchronization
2. This may lead to a very small amount of duplicate processing, but is
safer than file loss
3. This behavior needs to be explicitly documented
**Or (simpler solution)**:
Explicitly document:
```
start_mode=LATEST will skip files with modification time <= job start time.
If your source filesystem clock is ahead of the engine clock, consider using
start_mode=EARLIEST with file_filter_modified_start.
```
---
### Issue 3: inFlightSplits may leak after error recovery
**Location**: `ContinuousMultipleTableFileSourceSplitEnumerator.java:230-238`
```java
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (!(sourceEvent instanceof FileSplitFinishedEvent)) {
return;
}
String splitId = ((FileSplitFinishedEvent) sourceEvent).getSplitId();
synchronized (lock) {
inFlightSplits.removeIf(s -> Objects.equals(s.splitId(), splitId));
}
}
```
**Related Context**:
- State persistence: Lines 217-222 `snapshotState()`
- Reader side: `MultipleTableFileSourceReader.java:93-100`
**Problem Description**:
After a split is assigned to a Reader:
1. It moves from `pendingSplits` to `inFlightSplits` (line 199)
2. If the Reader successfully processes the file and sends
`FileSplitFinishedEvent`, the split is removed from `inFlightSplits`
3. **However**: If the Reader crashes before sending the event (e.g., read
succeeds but network failure before sending the event), this split will remain
in `inFlightSplits`
4. After recovery, this split will not be reassigned (because it is not in
`pendingSplits`), nor will it be removed from `inFlightSplits`
5. **Result**: This split permanently remains in the state, and the file may
have already been processed (if the sink write succeeded)
**Potential Risks**:
- State continues to grow (although it will not grow indefinitely, because
new splits may overwrite old splitIds)
- If the sink does not support idempotent writes, it may lead to duplicate
processing
- Difficult to debug: unable to distinguish which in-flight splits are truly
in processing and which are leaked
**Impact Scope**:
- Direct impact: All scenarios using `discovery_mode=CONTINUOUS`
- Impact surface: Single Connector (File)
**Severity**: MAJOR
**Improvement Suggestions**:
```java
// Add a timestamp to track when splits were assigned
private final Map<String, Long> inFlightSplitTimestamps = new HashMap<>();
@Override
public void handleSplitRequest(int subtaskId) {
List<FileSourceSplit> assign = new
ArrayList<>(DEFAULT_ASSIGN_BATCH_SIZE);
synchronized (lock) {
while (assign.size() < DEFAULT_ASSIGN_BATCH_SIZE &&
!pendingSplits.isEmpty()) {
FileSourceSplit split = pendingSplits.pollFirst();
if (split == null) {
break;
}
pendingSplitIds.remove(split.splitId());
inFlightSplits.add(split);
inFlightSplitTimestamps.put(split.splitId(),
System.currentTimeMillis());
assign.add(split);
}
}
// ... rest of the method
}
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (!(sourceEvent instanceof FileSplitFinishedEvent)) {
return;
}
String splitId = ((FileSplitFinishedEvent) sourceEvent).getSplitId();
synchronized (lock) {
inFlightSplits.removeIf(s -> Objects.equals(s.splitId(), splitId));
inFlightSplitTimestamps.remove(splitId);
}
}
// In addSplitsBack(), also update timestamp
@Override
public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
if (splits == null || splits.isEmpty()) {
return;
}
synchronized (lock) {
for (FileSourceSplit split : splits) {
inFlightSplits.remove(split);
inFlightSplitTimestamps.remove(split.splitId());
enqueueSplitIfAbsent(split);
}
}
handleSplitRequest(subtaskId);
}
// Add a method to clean up stale in-flight splits
private void cleanupStaleInFlightSplits() {
// This could be called periodically or during snapshot
long now = System.currentTimeMillis();
long staleThreshold = scanInterval.toMillis() * 10; // e.g., 10 scan
intervals
synchronized (lock) {
Iterator<Map.Entry<String, Long>> iter =
inFlightSplitTimestamps.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Long> entry = iter.next();
if (now - entry.getValue() > staleThreshold) {
log.warn("Removing stale in-flight split: {}",
entry.getKey());
inFlightSplits.removeIf(s -> Objects.equals(s.splitId(),
entry.getKey()));
iter.remove();
}
}
}
}
```
**Rationale**:
1. Tracking split allocation time can identify leaked splits
2. Periodically cleaning up leaked splits can avoid state growth
3. Cleanup strategy should be conservative (e.g., 10x scan interval) to
avoid accidental deletion
4. This behavior needs to be documented and users should be advised to
ensure sink idempotency
**Or (simpler solution)**:
Explicitly document limitations and verify sink configuration at startup:
```java
// In validateContinuousDiscoveryConfig()
if (sinkConfig.get("sink.type").equals("...")) {
log.warn("discovery_mode=continuous requires idempotent sink to
guarantee exactly-once. "
+ "If your sink is not idempotent, duplicate processing may
occur after failure.");
}
```
---
### Issue 4: Performance issues in massive file scenarios are not
sufficiently documented
**Location**: `ContinuousMultipleTableFileSourceSplitEnumerator.java:422-469`
```java
private List<FileStatus> listFilesRecursively(String path) throws
IOException {
List<FileStatus> files = new ArrayList<>();
FileStatus[] statuses = sourceFs.listStatus(path);
for (FileStatus status : statuses) {
if (status.isDirectory()) {
// Recursive call
files.addAll(listFilesRecursively(status.getPath().toString()));
}
// ... filtering logic
}
return files;
}
```
**Related Context**:
- Caller: Line 260 `scanOnce()`
- Performance bottleneck: Line 489 `targetFs.getFileStatus(targetFilePath)`
**Problem Description**:
Each scan will:
1. Recursively traverse the entire directory tree (lines 422-469)
2. Call `targetFs.getFileStatus()` for each file (line 489)
3. In some cases, perform content comparison (lines 592-615)
In massive file scenarios (e.g., 1 million small files):
- Each scan may take several minutes
- If `scan_interval=10S`, but the scan takes 5 minutes, the actual scan
interval will be much larger than the configured value
- More seriously, if the scan time exceeds `scan_interval`, multiple scans
may execute concurrently (because `scheduleWithFixedDelay` does not wait for
the previous task to complete)
**Potential Risks**:
- In massive file scenarios, serious performance degradation
- May lead to file system overload
- Users may not be aware of this limitation and improper configuration may
cause the job to hang
**Impact Scope**:
- Direct impact: All users using `discovery_mode=CONTINUOUS`
- Indirect impact: Other jobs sharing the file system
**Severity**: MAJOR
**Improvement Suggestions**:
1. **Add scan timeout protection**:
```java
private final Object lock = new Object();
private volatile boolean isScanning = false;
private void safeScanOnce() {
if (closed) {
return;
}
synchronized (lock) {
if (isScanning) {
log.warn("Previous scan is still in progress, skipping this
scheduled scan.");
return;
}
isScanning = true;
}
try {
long scanStart = System.currentTimeMillis();
scanOnce();
long scanDuration = System.currentTimeMillis() - scanStart;
if (scanDuration > scanInterval.toMillis() / 2) {
log.warn("Scan took {} ms, which is more than half of
scan_interval {} ms. "
+ "Consider increasing scan_interval or reducing the
number of files.",
scanDuration, scanInterval.toMillis());
}
} catch (Exception e) {
log.warn("Continuous discovery scan failed, will retry in next
interval.", e);
} finally {
synchronized (lock) {
isScanning = false;
}
}
}
```
2. **Explicitly document performance limitations**:
```markdown
### Performance Considerations
- Each scan recursively traverses the entire directory tree. For scenarios
with a large number of files (e.g., >100,000), the scan may take significant
time.
- Each source file triggers a `getFileStatus` call on the target filesystem
in update mode.
- Recommended `scan_interval` based on file count:
- <1,000 files: 10S
- 1,000-10,000 files: 30S-60S
- >10,000 files: 300S or more
- If the scan duration exceeds `scan_interval`, subsequent scans will be
skipped until the current scan completes.
```
3. **Add Metrics**:
```java
// In scanOnce()
long scanStart = System.nanoTime();
int scanned = 0;
// ... scanning logic
long scanDurationMs = (System.nanoTime() - scanStart) / 1_000_000;
log.info("Scan completed: {} files scanned in {} ms", scanned,
scanDurationMs);
// TODO: Report to metrics system
```
**Rationale**:
1. Prevent scan task pile-up from resource exhaustion
2. Help users diagnose performance issues
3. Provide configuration guidance to avoid user misuse
---
### Issue 5: Default values in documentation are inconsistent with code
**Location**: `FileBaseSourceOptions.java:42-47` vs documentation
**Code**:
```java
public static final Option<Duration> SCAN_INTERVAL =
Options.key("scan_interval")
.durationType()
.defaultValue(Duration.ofSeconds(10))
.withDescription(
"Scan interval for discovery_mode=continuous.
Recommended shorthand format is 10S; ISO-8601 format PT10S is also supported.
Default is 10S.");
```
**Documentation (LocalFile.md:432)**:
```
Only used when `discovery_mode=continuous`. Scan interval for periodic
discovery, recommended shorthand format `10S`, `30S`; ISO-8601 format `PT10S`,
`PT30S` is also supported. Default is `10S`.
```
**Problem Description**:
1. The default value in the code is `Duration.ofSeconds(10)`, i.e., PT10S or
10S
2. The documentation says "Default is 10S"
3. **However**: In the example configuration (LocalFile.md:695), `"10S"`
(uppercase) is used
4. According to the implementation of `ConfigUtil.convertToDuration()` (line
199), input will be `toUpperCase()`, so `"10s"` and `"10S"` are equivalent
5. **However**: The default value description in the documentation may
mislead users into thinking they must use uppercase
**Potential Risks**:
- Users may think they must use uppercase `"10S"`, when in fact lowercase
`"10s"` is also supported
- The documentation's format description is not clear enough, and users may
not know that both formats are supported
**Impact Scope**:
- Direct impact: Users reading the documentation
- Impact surface: Users of all 4 File connectors
**Severity**: MINOR
**Improvement Suggestions**:
Unify documentation description and explicitly specify format support:
```markdown
### scan_interval [string]
Only used when `discovery_mode=continuous`. Scan interval for periodic
discovery.
**Supported formats:**
- Shorthand (recommended): `10S`, `30s`, `500MS` (case-insensitive)
- ISO-8601: `PT10S`, `PT30S`
Default is `10S` (equivalent to `PT10S` or `10s`).
```
**Rationale**:
1. Explicitly state that both formats are supported
2. Explain case insensitivity
3. Avoid user confusion
---
### Issue 6: Test cases may fail on Windows
**Location**: `ContinuousMultipleTableFileSourceSplitEnumeratorTest.java:50`
```java
@DisabledOnOs(OS.WINDOWS)
class ContinuousMultipleTableFileSourceSplitEnumeratorTest {
// ...
}
```
**Related Context**:
- Test method: Lines 56-88 `testScanOnceEnqueueAssignAndAck()`
- Uses `@TempDir` and `Files.createDirectories()`
**Problem Description**:
The test class has already been disabled by `@DisabledOnOs(OS.WINDOWS)`,
which indicates:
1. The test cannot run on Windows (possibly due to file system API
incompatibility)
2. **However**: The PR modifications do not explicitly state that this
feature does not support Windows
3. If users run continuous mode on Windows, they may encounter untested
issues
**Potential Risks**:
- Windows users using this feature may encounter problems
- Problems may only be exposed in specific scenarios (e.g., path separators,
file permissions, etc.)
**Impact Scope**:
- Direct impact: Users running SeaTunnel on Windows
- Impact surface: All 4 File connectors
**Severity**: MINOR
**Improvement Suggestions**:
1. **Document platform support**:
```markdown
## Platform Support
- `discovery_mode=continuous` is tested on Linux and macOS.
- Windows is not currently supported due to filesystem API differences.
- If you need to run on Windows, use `discovery_mode=once` instead.
```
2. **Or**: Fix Windows compatibility issues and remove
`@DisabledOnOs(OS.WINDOWS)`
**Rationale**:
1. Explicitly document platform limitations to avoid user misuse
2. If planning to support Windows, corresponding tests should be added
---
### Issue 7: Lack of handling for concurrent file modifications during
scanning
**Location**: `ContinuousMultipleTableFileSourceSplitEnumerator.java:256-287`
```java
private void scanOnce() throws IOException {
int scanned = 0;
int queued = 0;
for (TableScanContext ctx : tableScanContexts) {
List<FileStatus> files = ctx.listFilesRecursively(ctx.rootPath);
scanned += files.size();
for (FileStatus fileStatus : files) {
if (!ctx.shouldProcess(fileStatus, jobStartTimeMillis,
startMode)) {
continue;
}
for (FileSourceSplit split : ctx.toSplits(fileStatus)) {
if (enqueueSplitIfAbsent(split)) {
queued++;
}
}
}
}
// ...
}
```
**Related Context**:
- File status retrieval: Line 424 `sourceFs.listStatus(path)`
- File reading: Reader side's `ReadStrategy`
**Problem Description**:
The following race conditions may occur during scanning:
1. **File deleted after scanning**: File exists during scanning, but has
been deleted when assigned to Reader
2. **File modified after scanning**: File size is 100MB during scanning, but
becomes 200MB when Reader reads it (append write)
3. **File moved during scanning**: At path A during scanning, moved to path
B when reading
Current code:
- Does not handle these race conditions
- If the file is deleted, the Reader may throw `FileNotFoundException`
- If the file is modified, the Reader may read incomplete data
**Potential Risks**:
- In high-concurrency file operation scenarios, it may lead to:
- Job failure (file does not exist)
- Data inconsistency (reading partially written files)
- Duplicate processing (file rescanned after modification)
**Impact Scope**:
- Direct impact: All scenarios using `discovery_mode=CONTINUOUS`
- Impact surface: Single Connector (File)
**Severity**: MAJOR
**Improvement Suggestions**:
Add file status validation on Reader side:
```java
// In MultipleTableFileSourceReader.pollNext()
@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
FileSourceSplit split;
synchronized (output.getCheckpointLock()) {
split = sourceSplits.poll();
if (split != null) {
try {
// Verify file still exists before reading
// (this requires passing FileSystemProxy to reader)
if (!fileSystem.exists(split.path())) {
log.warn("File {} no longer exists, skipping split",
split.path());
return; // Skip this split
}
ReadStrategy readStrategy =
readStrategyMap.get(split.getTableId());
// ... rest of reading logic
} catch (Exception e) {
// ... error handling
}
}
}
// ...
}
```
**Or**:
Document limitations:
```markdown
### Limitations
- Files deleted during scanning may cause task failures. Ensure files are
not deleted while being processed.
- Files modified during reading may result in inconsistent data. Use atomic
file operations (write to temp file, then rename).
```
**Rationale**:
1. Explicitly documenting limitations can help users avoid problems
2. To fully resolve this, a more complex file version control mechanism is
needed
---
### Issue 8: Minimum value of scan_interval is not restricted
**Location**: `FileBaseSourceOptions.java:42-47`
**Problem Description**:
The type of `scan_interval` is `Duration`, users can configure any value,
including:
- `1MS` (1 millisecond)
- `0S` (0 seconds, will be converted to 0)
- Negative numbers (although `Duration` does not support them, users may try)
If the user configures too small a `scan_interval`:
1. Scan tasks may pile up
2. The file system may be overloaded
3. SeaTunnel Engine may consume a lot of resources
**Potential Risks**:
- User misconfiguration leads to system instability
- No clear error message
**Impact Scope**:
- Direct impact: All users using `discovery_mode=CONTINUOUS`
- Impact surface: All 4 File connectors
**Severity**: MINOR
**Improvement Suggestions**:
1. **Add minimum value check in validation logic**:
```java
private static void
validateContinuousDiscoveryConfig(List<BaseFileSourceConfig> configs) {
// ... existing validation ...
Duration minInterval = Duration.ofSeconds(1); // Minimum 1 second
if (scanInterval.compareTo(minInterval) < 0) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("scan_interval must be at least %s, but got %s",
minInterval, scanInterval));
}
}
```
2. **Document recommended range**:
```markdown
### scan_interval [string]
Only used when `discovery_mode=continuous`. Scan interval for periodic
discovery.
**Supported formats:**
- Shorthand (recommended): `10S`, `30s`, `500MS` (case-insensitive)
- ISO-8601: `PT10S`, `PT30S`
**Minimum value:** `1S` (1 second)
Recommended range:
- For low-latency scenarios: `10S`-`60S`
- For large file counts: `300S` or more
Default is `10S` (equivalent to `PT10S` or `10s`).
```
**Rationale**:
1. Prevent user misconfiguration
2. Provide clear configuration guidance
3. Minimum value of 1 second is a reasonable balance point
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]