davidzollo commented on PR #10418:
URL: https://github.com/apache/seatunnel/pull/10418#issuecomment-3848210057
Good job
Here is a review from claudecode. You can check first.
### Issue: Missing Retry Limit - Could Cause IMAP Memory Leak
**Severity**: **High**
**File**: `CoordinatorService.java`
**Location**:
[CoordinatorService.java:497](seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java#L497)
**Problem Description**:
- The code tracks `attemptCount` but does not enforce a maximum retry limit
- If a worker node permanently fails, TaskGroupContext cleanup will never
succeed
- Cleanup records will accumulate indefinitely in the IMAP, causing **the
IMAP itself to leak memory**
**Impact**:
- In a large-scale cluster with frequent job failures, thousands of cleanup
records could accumulate
- Each `PipelineCleanupRecord` occupies ~500 bytes
- 10,000 records = ~5MB memory, multiplied by Hazelcast replication factor
**Risk Scenario**:
```
Day 1: 100 failed cleanup records
Day 7: 700 failed cleanup records
Day 30: 3,000 failed cleanup records
→ IMAP grows indefinitely, defeating the purpose of this PR
```
**Recommended Fix**:
```java
// Add this constant to CoordinatorService
private static final int MAX_CLEANUP_ATTEMPTS = 100; // ~100 hours of
retries at 60s intervals
private void processPendingPipelineCleanup(
PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
// Add this check at the beginning
if (record.getAttemptCount() > MAX_CLEANUP_ATTEMPTS) {
logger.warning(String.format(
"Pipeline %s cleanup exceeded max attempts (%d), giving up and
removing record. " +
"Metrics cleaned: %s, TaskGroups cleaned: %d/%d",
pipelineLocation, MAX_CLEANUP_ATTEMPTS,
record.isMetricsImapCleaned(),
record.getCleanedTaskGroups() != null ?
record.getCleanedTaskGroups().size() : 0,
record.getTaskGroups() != null ? record.getTaskGroups().size() :
0));
removePendingCleanupRecord(pipelineLocation, record);
return;
}
// Continue with existing cleanup logic...
}
```
---
### Issue : Concurrent IMAP Traversal - Risk of
ConcurrentModificationException
**Severity**: **High**
**File**: `CoordinatorService.java`
**Location**:
[CoordinatorService.java:477-480](seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java#L477-L480)
**Problem Description**:
- `cleanupPendingPipelines()` directly iterates over
`pendingCleanupIMap.entrySet()`
- If another thread (e.g., JobMaster) modifies the IMAP during iteration,
`ConcurrentModificationException` will be thrown
- This causes the entire cleanup task to fail, requiring a 60-second wait
until the next retry
**Current Code**:
```java
for (Map.Entry<PipelineLocation, PipelineCleanupRecord> entry :
pendingCleanupIMap.entrySet()) { // ← Unsafe: direct iteration
processPendingPipelineCleanup(entry.getKey(), entry.getValue());
}
```
**Impact**:
- Cleanup task crashes silently
- Resources remain uncleaned until the next scheduled run
- In high-throughput clusters, this could happen frequently
**Risk Scenario**:
```
Thread 1 (Cleanup): Iterating pendingCleanupIMap.entrySet()
Thread 2 (JobMaster): Adds new cleanup record to IMAP
→ Thread 1 throws ConcurrentModificationException
→ Cleanup fails for all pending pipelines
→ Must wait 60 seconds for next attempt
```
**Recommended Fix**:
```java
private void cleanupPendingPipelines() {
if (!isActive) {
return;
}
IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
this.pendingPipelineCleanupIMap;
if (pendingCleanupIMap == null || pendingCleanupIMap.isEmpty()) {
return;
}
// Copy the key set first to avoid concurrent modification
Set<PipelineLocation> keys;
try {
keys = new HashSet<>(pendingCleanupIMap.keySet());
} catch (Exception e) {
logger.warning("Failed to get pending cleanup keys: " +
e.getMessage());
return;
}
// Now iterate over the copied key set
for (PipelineLocation key : keys) {
try {
PipelineCleanupRecord record = pendingCleanupIMap.get(key);
if (record != null) {
processPendingPipelineCleanup(key, record);
}
} catch (HazelcastInstanceNotActiveException e) {
logger.warning("Skip cleanup: hazelcast not active");
break;
} catch (Throwable t) {
logger.warning(String.format(
"Failed to cleanup pipeline %s: %s", key, t.getMessage()),
t);
// Continue with next pipeline instead of crashing
}
}
}
```
---
### Issue : Infinite Loop - Risk of Thread Starvation and CPU Spike
**Severity**: **High**
**File**: `JobMaster.java`
**Location**:
[JobMaster.java:934-951](seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java#L934-L951)
**Problem Description**:
- `enqueuePipelineCleanupIfNeeded()` uses `while(true)` for CAS operations
without an exit condition
- In extreme high-concurrency scenarios, multiple threads competing for the
same pipeline could cause **indefinite spinning**
- This consumes CPU resources and may block the calling thread indefinitely
**Current Code**:
```java
while (true) { // ← No exit condition!
PipelineCleanupRecord existing =
pendingCleanupIMap.get(pipelineLocation);
if (existing == null) {
PipelineCleanupRecord prev =
pendingCleanupIMap.putIfAbsent(pipelineLocation, newRecord);
if (prev == null) return;
existing = prev;
}
PipelineCleanupRecord merged = existing.mergeFrom(newRecord);
if (merged.equals(existing)) return;
if (pendingCleanupIMap.replace(pipelineLocation, existing, merged)) {
return;
}
// ← If replace fails, loop continues indefinitely
}
```
**Impact**:
- Thread starvation: calling thread may spin indefinitely
- CPU spike: 100% CPU usage on one core
- System instability: may affect job submission and scheduling
**Risk Scenario**:
```
10 threads simultaneously try to enqueue cleanup for Pipeline X
Each thread:
1. Reads current record
2. Merges with new data
3. Attempts CAS replace
4. Fails due to another thread's update
5. Retries indefinitely
Result: All 10 threads spinning, consuming 1000% CPU
```
**Recommended Fix**:
```java
// Add this constant to JobMaster
private static final int MAX_ENQUEUE_RETRIES = 100;
public void enqueuePipelineCleanupIfNeeded(
PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
// ... existing validation logic ...
int retryCount = 0;
while (retryCount < MAX_ENQUEUE_RETRIES) {
PipelineCleanupRecord existing =
pendingCleanupIMap.get(pipelineLocation);
if (existing == null) {
PipelineCleanupRecord prev =
pendingCleanupIMap.putIfAbsent(pipelineLocation, newRecord);
if (prev == null) {
return; // Success
}
existing = prev;
}
PipelineCleanupRecord merged = existing.mergeFrom(newRecord);
if (merged.equals(existing)) {
return; // No changes needed
}
if (pendingCleanupIMap.replace(pipelineLocation, existing, merged)) {
return; // Success
}
retryCount++;
// Optional: Add backoff to reduce contention
if (retryCount % 10 == 0) {
try {
Thread.sleep(1); // 1ms backoff every 10 retries
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warning("Enqueue cleanup interrupted for pipeline: "
+ pipelineLocation);
return;
}
}
}
// Failed after max retries - log error but don't throw exception
logger.error(String.format(
"Failed to enqueue pipeline cleanup for %s after %d retries due to
high contention. " +
"Cleanup may be delayed until next scheduled run.",
pipelineLocation, MAX_ENQUEUE_RETRIES));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]