DanielCarter-stack commented on PR #10563:
URL: https://github.com/apache/seatunnel/pull/10563#issuecomment-4003649345

   <!-- code-pr-reviewer -->
   <!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10563", "part": 1, 
"total": 1} -->
   ### Issue 1: Operations may be executed repeatedly after state recovery
   
   **Location**: 
`ContinuousMultipleTableFileSourceSplitEnumerator.commitPostSyncOperations:847-863`
   
   **Related context**:
   - Caller: `CheckpointListener.notifyCheckpointComplete()`
   - Dependency classes: `FileSourceOperationState`, `FileSourceState`
   
   **Issue description**:
   The code removes all pending operations from `pendingOpsByCheckpoint` before 
committing operations, then commits them one by one. If an exception occurs 
during the commit process (e.g., network interruption), some operations may 
have been successfully executed but are still marked as failed, causing them to 
be re-executed during the next checkpoint.
   
   ```java
   // Current implementation problematic code
   synchronized (lock) {
       for (Long cp : toCommit.keySet()) {
           pendingOpsByCheckpoint.remove(cp);  // ← Removed first, but may fail 
later
       }
   }
   // ... commit operation ...
   for (FileSourceOperationState op : entry.getValue()) {
       attempted++;
       OpCommitResult result = commitSingleOperation(op);  // ← May throw 
uncaught exception
       if (result == OpCommitResult.SUCCESS) {
           succeeded++;
       } else {
           failed++;
           op.increaseRetryCount();
           remaining.add(op);  // ← Failed operation re-added
       }
   }
   ```
   
   **Potential risks**:
   - **Risk 1**: delete operations are executed repeatedly (although usually 
idempotent, it will generate WARN logs)
   - **Risk 2**: backup operations may create multiple versioned copies (e.g., 
`file.bin.v100_123456` exists repeatedly)
   
   **Impact scope**:
   - Direct impact: `commitPostSyncOperations()` method
   - Indirect impact: checkpoint completion callback logic
   - Affected area: all continuous mode tasks configured with `post_sync_action`
   
   **Severity**: Medium
   
   **Improvement suggestions**:
   
   ```java
   // Suggested improved code
   private void commitPostSyncOperations(long checkpointId) {
       Map<Long, List<FileSourceOperationState>> toCommit = new TreeMap<>();
       synchronized (lock) {
           for (Map.Entry<Long, List<FileSourceOperationState>> entry :
                   pendingOpsByCheckpoint.headMap(checkpointId, 
true).entrySet()) {
               toCommit.put(entry.getKey(), 
copyOperationStates(entry.getValue()));
           }
       }
       if (toCommit.isEmpty()) {
           return;
       }
   
       long attempted = 0L;
       long succeeded = 0L;
       long failed = 0L;
       long staleSkipped = 0L;
       Map<Long, List<FileSourceOperationState>> remainingByCheckpoint = new 
TreeMap<>();
       Set<Long> successfullyCommittedCheckpoints = new HashSet<>();  // ← Added
   
       for (Map.Entry<Long, List<FileSourceOperationState>> entry : 
toCommit.entrySet()) {
           List<FileSourceOperationState> remaining = new ArrayList<>();
           boolean allSucceeded = true;  // ← Added
           
           for (FileSourceOperationState op : entry.getValue()) {
               attempted++;
               try {
                   OpCommitResult result = commitSingleOperation(op);
                   if (result == OpCommitResult.SUCCESS) {
                       succeeded++;
                       incCounter(postSyncSucceededCounter);
                   } else if (result == OpCommitResult.STALE_SKIPPED) {
                       staleSkipped++;
                       incCounter(postSyncStaleSkippedCounter);
                   } else {
                       failed++;
                       incCounter(postSyncFailedCounter);
                       op.increaseRetryCount();
                       remaining.add(op);
                       allSucceeded = false;  // ← Mark as failed
                   }
               } catch (Exception e) {  // ← Catch unexpected exception
                   log.error("Unexpected error committing post-sync operation", 
e);
                   failed++;
                   incCounter(postSyncFailedCounter);
                   op.increaseRetryCount();
                   remaining.add(op);
                   allSucceeded = false;
               }
           }
           
           if (allSucceeded && remaining.isEmpty()) {
               successfullyCommittedCheckpoints.add(entry.getKey());  // ← Mark 
only if all succeed
           }
           if (!remaining.isEmpty()) {
               remainingByCheckpoint.put(entry.getKey(), remaining);
           }
       }
   
       synchronized (lock) {
           // ← Only remove fully successful checkpoints
           for (Long cp : successfullyCommittedCheckpoints) {
               pendingOpsByCheckpoint.remove(cp);
           }
           // Failed ones kept or updated
           for (Map.Entry<Long, List<FileSourceOperationState>> entry :
                   remainingByCheckpoint.entrySet()) {
               pendingOpsByCheckpoint.put(entry.getKey(), entry.getValue());
           }
       }
   
       log.info(
               "Post-sync commit finished for checkpoint {}: attempted={}, 
success={}, stale_skipped={}, failed={}, remaining_checkpoints={}",
               checkpointId,
               attempted,
               succeeded,
               staleSkipped,
               failed,
               remainingByCheckpoint.size());
   }
   ```
   
   **Rationale**:
   - Ensure atomicity: either all operations of a checkpoint succeed, or all 
are retained for retry
   - Prevent operation loss or duplication in partial success scenarios
   - Aligns with checkpoint semantics (checkpoint is atomic)
   
   ---
   
   ### Issue 2: Synchronous file operations in checkpoint completion callback 
may cause blocking
   
   **Location**: 
`ContinuousMultipleTableFileSourceSplitEnumerator.notifyCheckpointComplete:311-312`
   
   **Related context**:
   - Interface contract: `CheckpointListener.notifyCheckpointComplete(long 
checkpointId)`
   - Call chain: SeaTunnel Engine → SourceSplitEnumerator → 
commitPostSyncOperations → HadoopFileSystemProxy.deleteFile/renameFile
   
   **Issue description**:
   The `notifyCheckpointComplete` callback is executed synchronously. If a 
single checkpoint contains a large number of file operations (e.g., delete 
operations for 100k files), it will block the checkpoint completion 
notification, potentially leading to:
   - Checkpoint timeout
   - Delayed triggering of the next checkpoint
   - Overall job performance degradation
   
   ```java
   @Override
   public void notifyCheckpointComplete(long checkpointId) {
       commitPostSyncOperations(checkpointId);  // ← Synchronous execution, may 
take a long time
       runRetentionIfNeeded(checkpointId);      // ← Synchronous execution, may 
take a long time
   }
   ```
   
   **Potential risks**:
   - **Risk 1**: checkpoint time exceeds `checkpoint.interval`, causing 
backpressure
   - **Risk 2**: on remote file systems such as HDFS/FTP, operation latency is 
amplified
   
   **Impact scope**:
   - Direct impact: `notifyCheckpointComplete()` method
   - Indirect impact: checkpoint coordination logic
   - Affected area: all continuous mode tasks (especially in high-throughput 
scenarios)
   
   **Severity**: Medium
   
   **Improvement suggestions**:
   
   ```java
   // Option 1: Async execution (simplified version)
   private final ExecutorService postSyncExecutor;
   
   @Override
   public void open() {
       // ... existing code ...
       postSyncExecutor = Executors.newSingleThreadExecutor(
               r -> new Thread(r, "file-source-post-sync"));
   }
   
   @Override
   public void notifyCheckpointComplete(long checkpointId) {
       final long cpId = checkpointId;
       postSyncExecutor.submit(() -> {
           try {
               commitPostSyncOperations(cpId);
               runRetentionIfNeeded(cpId);
           } catch (Exception e) {
               log.error("Post-sync execution failed for checkpoint {}", cpId, 
e);
           }
       });
   }
   
   @Override
   public void close() throws IOException {
       closed = true;
       if (scheduler != null) {
           scheduler.shutdownNow();
       }
       if (postSyncExecutor != null) {  // ← Added
           postSyncExecutor.shutdownNow();
           try {
               if (!postSyncExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
                   log.warn("Post-sync executor does not terminate in 30 
seconds.");
               }
           } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
           }
       }
       // ... existing close logic ...
   }
   ```
   
   **Rationale**:
   - Avoid blocking checkpoint callbacks
   - Maintain operation order (single-threaded executor)
   - Trade-off needed: asynchronous execution may cause inconsistency between 
operations and checkpoint state (but this has been mitigated by persisting 
through pendingOpsByCheckpoint)
   
   **Alternative approaches**:
   - Batch commit: limit the number of operations processed per checkpoint 
(e.g., maximum 1000)
   - Timeout control: add timeout parameter to `commitSingleOperation`
   
   ---
   
   ### Issue 3: Infinite retry mechanism may cause accumulation
   
   **Location**: 
`ContinuousMultipleTableFileSourceSplitEnumerator.commitSingleOperation:879-896`
   
   **Related context**:
   - Caller: `commitPostSyncOperations`
   - State: `FileSourceOperationState.retryCount`
   
   **Issue description**:
   Failed operations will be retried, but there is no retry limit or alert 
mechanism in the code, potentially leading to:
   - Continuous growth of `pendingOpsByCheckpoint`
   - Failed operations permanently occupying memory
   - No obvious alerts, users cannot perceive the issue
   
   ```java
   if (result == OpCommitResult.SUCCESS) {
       succeeded++;
   } else {
       failed++;
       op.increaseRetryCount();  // ← Unlimited accumulation
       remaining.add(op);
   }
   ```
   
   **Potential risks**:
   - **Risk 1**: If the backup path is unavailable, operations will retry 
infinitely
   - **Risk 2**: Memory leak (large accumulation of failed operations)
   
   **Impact scope**:
   - Direct impact: `commitPostSyncOperations` method
   - Affected area: all tasks configured with post_sync_action
   
   **Severity**: Low (but requires long-term monitoring)
   
   **Improvement suggestions**:
   
   ```java
   // Suggested improved code
   private static final int MAX_POST_SYNC_RETRY_COUNT = 3;
   
   private void commitPostSyncOperations(long checkpointId) {
       // ... existing code ...
       for (FileSourceOperationState op : entry.getValue()) {
           attempted++;
           OpCommitResult result = commitSingleOperation(op);
           if (result == OpCommitResult.SUCCESS) {
               succeeded++;
               incCounter(postSyncSucceededCounter);
           } else if (result == OpCommitResult.STALE_SKIPPED) {
               staleSkipped++;
               incCounter(postSyncStaleSkippedCounter);
           } else {
               failed++;
               incCounter(postSyncFailedCounter);
               op.increaseRetryCount();
               
               // ← Added: give up after retry limit reached
               if (op.getRetryCount() >= MAX_POST_SYNC_RETRY_COUNT) {
                   log.error(
                           "Post-sync operation exceeded max retry count ({}), 
giving up. action={}, splitId={}, source={}",
                           MAX_POST_SYNC_RETRY_COUNT,
                           op.getAction(),
                           op.getSplitId(),
                           maskUriUserInfo(op.getSourcePath()));
                   // Not re-added to remaining, equivalent to discarding
               } else {
                   remaining.add(op);
               }
           }
       }
       // ... existing code ...
   }
   ```
   
   **Rationale**:
   - Prevent infinite retries
   - Preserve clear error logs
   - Follows "fail-fast" principle
   
   ---
   
   ### Issue 4: HadoopFileSystemProxy may fail after serialization and 
deserialization
   
   **Location**: `TableScanContext` constructor (inner class, approximately 
line 650)
   
   **Related context**:
   - `TableScanContext.sourceFs` and `targetFs` fields
   - `HadoopFileSystemProxy` implements `Serializable`, but contains 
`transient` fields
   
   **Issue description**:
   `TableScanContext` is held as a field in 
`ContinuousMultipleTableFileSourceSplitEnumerator`, and the enumerator itself 
needs to be serialized (although in reality `FileSourceState` is serialized). 
However, if the enumerator needs to be serialized in the future, the transient 
fields of `HadoopFileSystemProxy` (`userGroupInformation`, `fileSystem`) will 
cause issues.
   
   **Current impact**:
   - ✅ Actually enumerator is not serialized (only `FileSourceState` is 
serialized), so currently there is no issue
   - ⚠️ But the code design is not clear enough, prone to misuse
   
   **Severity**: Low (design potential issue)
   
   **Improvement suggestions**:
   Add comments on `TableScanContext`:
   ```java
   /**
    * Table scan context holding file system proxies.
    * 
    * <p>Note: This class is NOT serializable due to HadoopFileSystemProxy's 
transient fields.
    * It should only be used within the enumerator's lifecycle and not included 
in checkpoint state.
    */
   private static final class TableScanContext implements AutoCloseable {
       // ... existing code ...
   }
   ```
   
   ---
   
   ### Issue 5: Log level configuration may cause excessive logs in production 
environment
   
   **Location**: `ContinuousMultipleTableFileSourceSplitEnumerator`多处使用 
`log.isDebugEnabled()`
   
   **Issue description**:
   There are many `log.isDebugEnabled()` checks in the code, but the default 
log level may be INFO, causing these debug messages to be lost in production 
environments, while production environments may need this information for 
troubleshooting.
   
   **Impact scope**:
   - Observability
   
   **Severity**: Low
   
   **Improvement suggestions**:
   Promote key information to INFO level:
   ```java
   // Current: DEBUG level
   if (log.isDebugEnabled()) {
       log.debug("Assigned {} splits to reader {}", assign.size(), subtaskId);
   }
   
   // Suggested: INFO level (lower threshold)
   if (assign.size() > 100 || log.isInfoEnabled()) {
       log.info("Assigned {} splits to reader {}", assign.size(), subtaskId);
   }
   ```
   
   ---


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to