davidzollo commented on PR #10418:
URL: https://github.com/apache/seatunnel/pull/10418#issuecomment-3848210057

   Good job
   Here is a review from claudecode. You can check first.
   
   ### Issue: Missing Retry Limit - Could Cause IMAP Memory Leak
   
   **Severity**:  **High**
   **File**: `CoordinatorService.java`
   **Location**: 
[CoordinatorService.java:497](seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java#L497)
   
   **Problem Description**:
   - The code tracks `attemptCount` but does not enforce a maximum retry limit
   - If a worker node permanently fails, TaskGroupContext cleanup will never 
succeed
   - Cleanup records will accumulate indefinitely in the IMAP, causing **the 
IMAP itself to leak memory**
   
   **Impact**:
   - In a large-scale cluster with frequent job failures, thousands of cleanup 
records could accumulate
   - Each `PipelineCleanupRecord` occupies ~500 bytes
   - 10,000 records = ~5MB memory, multiplied by Hazelcast replication factor
   
   **Risk Scenario**:
   ```
   Day 1: 100 failed cleanup records
   Day 7: 700 failed cleanup records
   Day 30: 3,000 failed cleanup records
   → IMAP grows indefinitely, defeating the purpose of this PR
   ```
   
   **Recommended Fix**:
   ```java
   // Add this constant to CoordinatorService
   private static final int MAX_CLEANUP_ATTEMPTS = 100; // ~100 hours of 
retries at 60s intervals
   
   private void processPendingPipelineCleanup(
           PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
   
       // Add this check at the beginning
       if (record.getAttemptCount() > MAX_CLEANUP_ATTEMPTS) {
           logger.warning(String.format(
               "Pipeline %s cleanup exceeded max attempts (%d), giving up and 
removing record. " +
               "Metrics cleaned: %s, TaskGroups cleaned: %d/%d",
               pipelineLocation, MAX_CLEANUP_ATTEMPTS,
               record.isMetricsImapCleaned(),
               record.getCleanedTaskGroups() != null ? 
record.getCleanedTaskGroups().size() : 0,
               record.getTaskGroups() != null ? record.getTaskGroups().size() : 
0));
           removePendingCleanupRecord(pipelineLocation, record);
           return;
       }
   
       // Continue with existing cleanup logic...
   }
   ```
   
   ---
   
   ### Issue : Concurrent IMAP Traversal - Risk of 
ConcurrentModificationException
   
   **Severity**:  **High**
   **File**: `CoordinatorService.java`
   **Location**: 
[CoordinatorService.java:477-480](seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java#L477-L480)
   
   **Problem Description**:
   - `cleanupPendingPipelines()` directly iterates over 
`pendingCleanupIMap.entrySet()`
   - If another thread (e.g., JobMaster) modifies the IMAP during iteration, 
`ConcurrentModificationException` will be thrown
   - This causes the entire cleanup task to fail, requiring a 60-second wait 
until the next retry
   
   **Current Code**:
   ```java
   for (Map.Entry<PipelineLocation, PipelineCleanupRecord> entry :
           pendingCleanupIMap.entrySet()) {  // ← Unsafe: direct iteration
       processPendingPipelineCleanup(entry.getKey(), entry.getValue());
   }
   ```
   
   **Impact**:
   - Cleanup task crashes silently
   - Resources remain uncleaned until the next scheduled run
   - In high-throughput clusters, this could happen frequently
   
   **Risk Scenario**:
   ```
   Thread 1 (Cleanup): Iterating pendingCleanupIMap.entrySet()
   Thread 2 (JobMaster): Adds new cleanup record to IMAP
   → Thread 1 throws ConcurrentModificationException
   → Cleanup fails for all pending pipelines
   → Must wait 60 seconds for next attempt
   ```
   
   **Recommended Fix**:
   ```java
   private void cleanupPendingPipelines() {
       if (!isActive) {
           return;
       }
   
       IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
           this.pendingPipelineCleanupIMap;
       if (pendingCleanupIMap == null || pendingCleanupIMap.isEmpty()) {
           return;
       }
   
       // Copy the key set first to avoid concurrent modification
       Set<PipelineLocation> keys;
       try {
           keys = new HashSet<>(pendingCleanupIMap.keySet());
       } catch (Exception e) {
           logger.warning("Failed to get pending cleanup keys: " + 
e.getMessage());
           return;
       }
   
       // Now iterate over the copied key set
       for (PipelineLocation key : keys) {
           try {
               PipelineCleanupRecord record = pendingCleanupIMap.get(key);
               if (record != null) {
                   processPendingPipelineCleanup(key, record);
               }
           } catch (HazelcastInstanceNotActiveException e) {
               logger.warning("Skip cleanup: hazelcast not active");
               break;
           } catch (Throwable t) {
               logger.warning(String.format(
                   "Failed to cleanup pipeline %s: %s", key, t.getMessage()), 
t);
               // Continue with next pipeline instead of crashing
           }
       }
   }
   ```
   
   
   ---
   
   ### Issue : Infinite Loop - Risk of Thread Starvation and CPU Spike
   
   **Severity**: **High**
   **File**: `JobMaster.java`
   **Location**: 
[JobMaster.java:934-951](seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java#L934-L951)
   
   **Problem Description**:
   - `enqueuePipelineCleanupIfNeeded()` uses `while(true)` for CAS operations 
without an exit condition
   - In extreme high-concurrency scenarios, multiple threads competing for the 
same pipeline could cause **indefinite spinning**
   - This consumes CPU resources and may block the calling thread indefinitely
   
   **Current Code**:
   ```java
   while (true) {  // ← No exit condition!
       PipelineCleanupRecord existing = 
pendingCleanupIMap.get(pipelineLocation);
       if (existing == null) {
           PipelineCleanupRecord prev = 
pendingCleanupIMap.putIfAbsent(pipelineLocation, newRecord);
           if (prev == null) return;
           existing = prev;
       }
       PipelineCleanupRecord merged = existing.mergeFrom(newRecord);
       if (merged.equals(existing)) return;
       if (pendingCleanupIMap.replace(pipelineLocation, existing, merged)) {
           return;
       }
       // ← If replace fails, loop continues indefinitely
   }
   ```
   
   **Impact**:
   - Thread starvation: calling thread may spin indefinitely
   - CPU spike: 100% CPU usage on one core
   - System instability: may affect job submission and scheduling
   
   **Risk Scenario**:
   ```
   10 threads simultaneously try to enqueue cleanup for Pipeline X
   Each thread:
     1. Reads current record
     2. Merges with new data
     3. Attempts CAS replace
     4. Fails due to another thread's update
     5. Retries indefinitely
   
   Result: All 10 threads spinning, consuming 1000% CPU
   ```
   
   **Recommended Fix**:
   ```java
   // Add this constant to JobMaster
   private static final int MAX_ENQUEUE_RETRIES = 100;
   
   public void enqueuePipelineCleanupIfNeeded(
           PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
   
       // ... existing validation logic ...
   
       int retryCount = 0;
       while (retryCount < MAX_ENQUEUE_RETRIES) {
           PipelineCleanupRecord existing = 
pendingCleanupIMap.get(pipelineLocation);
   
           if (existing == null) {
               PipelineCleanupRecord prev = 
pendingCleanupIMap.putIfAbsent(pipelineLocation, newRecord);
               if (prev == null) {
                   return; // Success
               }
               existing = prev;
           }
   
           PipelineCleanupRecord merged = existing.mergeFrom(newRecord);
           if (merged.equals(existing)) {
               return; // No changes needed
           }
   
           if (pendingCleanupIMap.replace(pipelineLocation, existing, merged)) {
               return; // Success
           }
   
           retryCount++;
   
           // Optional: Add backoff to reduce contention
           if (retryCount % 10 == 0) {
               try {
                   Thread.sleep(1); // 1ms backoff every 10 retries
               } catch (InterruptedException e) {
                   Thread.currentThread().interrupt();
                   logger.warning("Enqueue cleanup interrupted for pipeline: " 
+ pipelineLocation);
                   return;
               }
           }
       }
   
       // Failed after max retries - log error but don't throw exception
       logger.error(String.format(
           "Failed to enqueue pipeline cleanup for %s after %d retries due to 
high contention. " +
           "Cleanup may be delayed until next scheduled run.",
           pipelineLocation, MAX_ENQUEUE_RETRIES));
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to