DanielCarter-stack commented on PR #10563:
URL: https://github.com/apache/seatunnel/pull/10563#issuecomment-4003649345
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10563", "part": 1,
"total": 1} -->
### Issue 1: Operations may be executed repeatedly after state recovery
**Location**:
`ContinuousMultipleTableFileSourceSplitEnumerator.commitPostSyncOperations:847-863`
**Related context**:
- Caller: `CheckpointListener.notifyCheckpointComplete()`
- Dependency classes: `FileSourceOperationState`, `FileSourceState`
**Issue description**:
The code removes all pending operations from `pendingOpsByCheckpoint` before
committing operations, then commits them one by one. If an exception occurs
during the commit process (e.g., network interruption), some operations may
have been successfully executed but are still marked as failed, causing them to
be re-executed during the next checkpoint.
```java
// Current implementation problematic code
synchronized (lock) {
for (Long cp : toCommit.keySet()) {
pendingOpsByCheckpoint.remove(cp); // ← Removed first, but may fail
later
}
}
// ... commit operation ...
for (FileSourceOperationState op : entry.getValue()) {
attempted++;
OpCommitResult result = commitSingleOperation(op); // ← May throw
uncaught exception
if (result == OpCommitResult.SUCCESS) {
succeeded++;
} else {
failed++;
op.increaseRetryCount();
remaining.add(op); // ← Failed operation re-added
}
}
```
**Potential risks**:
- **Risk 1**: delete operations are executed repeatedly (although usually
idempotent, it will generate WARN logs)
- **Risk 2**: backup operations may create multiple versioned copies (e.g.,
`file.bin.v100_123456` exists repeatedly)
**Impact scope**:
- Direct impact: `commitPostSyncOperations()` method
- Indirect impact: checkpoint completion callback logic
- Affected area: all continuous mode tasks configured with `post_sync_action`
**Severity**: Medium
**Improvement suggestions**:
```java
// Suggested improved code
private void commitPostSyncOperations(long checkpointId) {
Map<Long, List<FileSourceOperationState>> toCommit = new TreeMap<>();
synchronized (lock) {
for (Map.Entry<Long, List<FileSourceOperationState>> entry :
pendingOpsByCheckpoint.headMap(checkpointId,
true).entrySet()) {
toCommit.put(entry.getKey(),
copyOperationStates(entry.getValue()));
}
}
if (toCommit.isEmpty()) {
return;
}
long attempted = 0L;
long succeeded = 0L;
long failed = 0L;
long staleSkipped = 0L;
Map<Long, List<FileSourceOperationState>> remainingByCheckpoint = new
TreeMap<>();
Set<Long> successfullyCommittedCheckpoints = new HashSet<>(); // ← Added
for (Map.Entry<Long, List<FileSourceOperationState>> entry :
toCommit.entrySet()) {
List<FileSourceOperationState> remaining = new ArrayList<>();
boolean allSucceeded = true; // ← Added
for (FileSourceOperationState op : entry.getValue()) {
attempted++;
try {
OpCommitResult result = commitSingleOperation(op);
if (result == OpCommitResult.SUCCESS) {
succeeded++;
incCounter(postSyncSucceededCounter);
} else if (result == OpCommitResult.STALE_SKIPPED) {
staleSkipped++;
incCounter(postSyncStaleSkippedCounter);
} else {
failed++;
incCounter(postSyncFailedCounter);
op.increaseRetryCount();
remaining.add(op);
allSucceeded = false; // ← Mark as failed
}
} catch (Exception e) { // ← Catch unexpected exception
log.error("Unexpected error committing post-sync operation",
e);
failed++;
incCounter(postSyncFailedCounter);
op.increaseRetryCount();
remaining.add(op);
allSucceeded = false;
}
}
if (allSucceeded && remaining.isEmpty()) {
successfullyCommittedCheckpoints.add(entry.getKey()); // ← Mark
only if all succeed
}
if (!remaining.isEmpty()) {
remainingByCheckpoint.put(entry.getKey(), remaining);
}
}
synchronized (lock) {
// ← Only remove fully successful checkpoints
for (Long cp : successfullyCommittedCheckpoints) {
pendingOpsByCheckpoint.remove(cp);
}
// Failed ones kept or updated
for (Map.Entry<Long, List<FileSourceOperationState>> entry :
remainingByCheckpoint.entrySet()) {
pendingOpsByCheckpoint.put(entry.getKey(), entry.getValue());
}
}
log.info(
"Post-sync commit finished for checkpoint {}: attempted={},
success={}, stale_skipped={}, failed={}, remaining_checkpoints={}",
checkpointId,
attempted,
succeeded,
staleSkipped,
failed,
remainingByCheckpoint.size());
}
```
**Rationale**:
- Ensure atomicity: either all operations of a checkpoint succeed, or all
are retained for retry
- Prevent operation loss or duplication in partial success scenarios
- Aligns with checkpoint semantics (checkpoint is atomic)
---
### Issue 2: Synchronous file operations in checkpoint completion callback
may cause blocking
**Location**:
`ContinuousMultipleTableFileSourceSplitEnumerator.notifyCheckpointComplete:311-312`
**Related context**:
- Interface contract: `CheckpointListener.notifyCheckpointComplete(long
checkpointId)`
- Call chain: SeaTunnel Engine → SourceSplitEnumerator →
commitPostSyncOperations → HadoopFileSystemProxy.deleteFile/renameFile
**Issue description**:
The `notifyCheckpointComplete` callback is executed synchronously. If a
single checkpoint contains a large number of file operations (e.g., delete
operations for 100k files), it will block the checkpoint completion
notification, potentially leading to:
- Checkpoint timeout
- Delayed triggering of the next checkpoint
- Overall job performance degradation
```java
@Override
public void notifyCheckpointComplete(long checkpointId) {
commitPostSyncOperations(checkpointId); // ← Synchronous execution, may
take a long time
runRetentionIfNeeded(checkpointId); // ← Synchronous execution, may
take a long time
}
```
**Potential risks**:
- **Risk 1**: checkpoint time exceeds `checkpoint.interval`, causing
backpressure
- **Risk 2**: on remote file systems such as HDFS/FTP, operation latency is
amplified
**Impact scope**:
- Direct impact: `notifyCheckpointComplete()` method
- Indirect impact: checkpoint coordination logic
- Affected area: all continuous mode tasks (especially in high-throughput
scenarios)
**Severity**: Medium
**Improvement suggestions**:
```java
// Option 1: Async execution (simplified version)
private final ExecutorService postSyncExecutor;
@Override
public void open() {
// ... existing code ...
postSyncExecutor = Executors.newSingleThreadExecutor(
r -> new Thread(r, "file-source-post-sync"));
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
final long cpId = checkpointId;
postSyncExecutor.submit(() -> {
try {
commitPostSyncOperations(cpId);
runRetentionIfNeeded(cpId);
} catch (Exception e) {
log.error("Post-sync execution failed for checkpoint {}", cpId,
e);
}
});
}
@Override
public void close() throws IOException {
closed = true;
if (scheduler != null) {
scheduler.shutdownNow();
}
if (postSyncExecutor != null) { // ← Added
postSyncExecutor.shutdownNow();
try {
if (!postSyncExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
log.warn("Post-sync executor does not terminate in 30
seconds.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// ... existing close logic ...
}
```
**Rationale**:
- Avoid blocking checkpoint callbacks
- Maintain operation order (single-threaded executor)
- Trade-off needed: asynchronous execution may cause inconsistency between
operations and checkpoint state (but this has been mitigated by persisting
through pendingOpsByCheckpoint)
**Alternative approaches**:
- Batch commit: limit the number of operations processed per checkpoint
(e.g., maximum 1000)
- Timeout control: add timeout parameter to `commitSingleOperation`
---
### Issue 3: Infinite retry mechanism may cause accumulation
**Location**:
`ContinuousMultipleTableFileSourceSplitEnumerator.commitSingleOperation:879-896`
**Related context**:
- Caller: `commitPostSyncOperations`
- State: `FileSourceOperationState.retryCount`
**Issue description**:
Failed operations will be retried, but there is no retry limit or alert
mechanism in the code, potentially leading to:
- Continuous growth of `pendingOpsByCheckpoint`
- Failed operations permanently occupying memory
- No obvious alerts, users cannot perceive the issue
```java
if (result == OpCommitResult.SUCCESS) {
succeeded++;
} else {
failed++;
op.increaseRetryCount(); // ← Unlimited accumulation
remaining.add(op);
}
```
**Potential risks**:
- **Risk 1**: If the backup path is unavailable, operations will retry
infinitely
- **Risk 2**: Memory leak (large accumulation of failed operations)
**Impact scope**:
- Direct impact: `commitPostSyncOperations` method
- Affected area: all tasks configured with post_sync_action
**Severity**: Low (but requires long-term monitoring)
**Improvement suggestions**:
```java
// Suggested improved code
private static final int MAX_POST_SYNC_RETRY_COUNT = 3;
private void commitPostSyncOperations(long checkpointId) {
// ... existing code ...
for (FileSourceOperationState op : entry.getValue()) {
attempted++;
OpCommitResult result = commitSingleOperation(op);
if (result == OpCommitResult.SUCCESS) {
succeeded++;
incCounter(postSyncSucceededCounter);
} else if (result == OpCommitResult.STALE_SKIPPED) {
staleSkipped++;
incCounter(postSyncStaleSkippedCounter);
} else {
failed++;
incCounter(postSyncFailedCounter);
op.increaseRetryCount();
// ← Added: give up after retry limit reached
if (op.getRetryCount() >= MAX_POST_SYNC_RETRY_COUNT) {
log.error(
"Post-sync operation exceeded max retry count ({}),
giving up. action={}, splitId={}, source={}",
MAX_POST_SYNC_RETRY_COUNT,
op.getAction(),
op.getSplitId(),
maskUriUserInfo(op.getSourcePath()));
// Not re-added to remaining, equivalent to discarding
} else {
remaining.add(op);
}
}
}
// ... existing code ...
}
```
**Rationale**:
- Prevent infinite retries
- Preserve clear error logs
- Follows "fail-fast" principle
---
### Issue 4: HadoopFileSystemProxy may fail after serialization and
deserialization
**Location**: `TableScanContext` constructor (inner class, approximately
line 650)
**Related context**:
- `TableScanContext.sourceFs` and `targetFs` fields
- `HadoopFileSystemProxy` implements `Serializable`, but contains
`transient` fields
**Issue description**:
`TableScanContext` is held as a field in
`ContinuousMultipleTableFileSourceSplitEnumerator`, and the enumerator itself
needs to be serialized (although in reality `FileSourceState` is serialized).
However, if the enumerator needs to be serialized in the future, the transient
fields of `HadoopFileSystemProxy` (`userGroupInformation`, `fileSystem`) will
cause issues.
**Current impact**:
- ✅ Actually enumerator is not serialized (only `FileSourceState` is
serialized), so currently there is no issue
- ⚠️ But the code design is not clear enough, prone to misuse
**Severity**: Low (design potential issue)
**Improvement suggestions**:
Add comments on `TableScanContext`:
```java
/**
* Table scan context holding file system proxies.
*
* <p>Note: This class is NOT serializable due to HadoopFileSystemProxy's
transient fields.
* It should only be used within the enumerator's lifecycle and not included
in checkpoint state.
*/
private static final class TableScanContext implements AutoCloseable {
// ... existing code ...
}
```
---
### Issue 5: Log level configuration may cause excessive logs in production
environment
**Location**: `ContinuousMultipleTableFileSourceSplitEnumerator`多处使用
`log.isDebugEnabled()`
**Issue description**:
There are many `log.isDebugEnabled()` checks in the code, but the default
log level may be INFO, causing these debug messages to be lost in production
environments, while production environments may need this information for
troubleshooting.
**Impact scope**:
- Observability
**Severity**: Low
**Improvement suggestions**:
Promote key information to INFO level:
```java
// Current: DEBUG level
if (log.isDebugEnabled()) {
log.debug("Assigned {} splits to reader {}", assign.size(), subtaskId);
}
// Suggested: INFO level (lower threshold)
if (assign.size() > 100 || log.isInfoEnabled()) {
log.info("Assigned {} splits to reader {}", assign.size(), subtaskId);
}
```
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]