zhangshenghang commented on PR #10418:
URL: https://github.com/apache/seatunnel/pull/10418#issuecomment-3868881139
> Good job Here is a review from claudecode. You can check first.
>
> ### Issue: Missing Retry Limit - Could Cause IMAP Memory Leak
> **Severity**: **High** **File**: `CoordinatorService.java` **Location**:
[CoordinatorService.java:497](seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java#L497)
>
> **Problem Description**:
>
> * The code tracks `attemptCount` but does not enforce a maximum retry limit
> * If a worker node permanently fails, TaskGroupContext cleanup will never
succeed
> * Cleanup records will accumulate indefinitely in the IMAP, causing **the
IMAP itself to leak memory**
>
> **Impact**:
>
> * In a large-scale cluster with frequent job failures, thousands of
cleanup records could accumulate
> * Each `PipelineCleanupRecord` occupies ~500 bytes
> * 10,000 records = ~5MB memory, multiplied by Hazelcast replication factor
>
> **Risk Scenario**:
>
> ```
> Day 1: 100 failed cleanup records
> Day 7: 700 failed cleanup records
> Day 30: 3,000 failed cleanup records
> → IMAP grows indefinitely, defeating the purpose of this PR
> ```
>
> **Recommended Fix**:
>
> ```java
> // Add this constant to CoordinatorService
> private static final int MAX_CLEANUP_ATTEMPTS = 100; // ~100 hours of
retries at 60s intervals
>
> private void processPendingPipelineCleanup(
> PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
>
> // Add this check at the beginning
> if (record.getAttemptCount() > MAX_CLEANUP_ATTEMPTS) {
> logger.warning(String.format(
> "Pipeline %s cleanup exceeded max attempts (%d), giving up and
removing record. " +
> "Metrics cleaned: %s, TaskGroups cleaned: %d/%d",
> pipelineLocation, MAX_CLEANUP_ATTEMPTS,
> record.isMetricsImapCleaned(),
> record.getCleanedTaskGroups() != null ?
record.getCleanedTaskGroups().size() : 0,
> record.getTaskGroups() != null ? record.getTaskGroups().size()
: 0));
> removePendingCleanupRecord(pipelineLocation, record);
> return;
> }
>
> // Continue with existing cleanup logic...
> }
> ```
>
> ### Issue : Concurrent IMAP Traversal - Risk of
ConcurrentModificationException
> **Severity**: **High** **File**: `CoordinatorService.java` **Location**:
[CoordinatorService.java:477-480](seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java#L477-L480)
>
> **Problem Description**:
>
> * `cleanupPendingPipelines()` directly iterates over
`pendingCleanupIMap.entrySet()`
> * If another thread (e.g., JobMaster) modifies the IMAP during iteration,
`ConcurrentModificationException` will be thrown
> * This causes the entire cleanup task to fail, requiring a 60-second wait
until the next retry
>
> **Current Code**:
>
> ```java
> for (Map.Entry<PipelineLocation, PipelineCleanupRecord> entry :
> pendingCleanupIMap.entrySet()) { // ← Unsafe: direct iteration
> processPendingPipelineCleanup(entry.getKey(), entry.getValue());
> }
> ```
>
> **Impact**:
>
> * Cleanup task crashes silently
> * Resources remain uncleaned until the next scheduled run
> * In high-throughput clusters, this could happen frequently
>
> **Risk Scenario**:
>
> ```
> Thread 1 (Cleanup): Iterating pendingCleanupIMap.entrySet()
> Thread 2 (JobMaster): Adds new cleanup record to IMAP
> → Thread 1 throws ConcurrentModificationException
> → Cleanup fails for all pending pipelines
> → Must wait 60 seconds for next attempt
> ```
>
> **Recommended Fix**:
>
> ```java
> private void cleanupPendingPipelines() {
> if (!isActive) {
> return;
> }
>
> IMap<PipelineLocation, PipelineCleanupRecord> pendingCleanupIMap =
> this.pendingPipelineCleanupIMap;
> if (pendingCleanupIMap == null || pendingCleanupIMap.isEmpty()) {
> return;
> }
>
> // Copy the key set first to avoid concurrent modification
> Set<PipelineLocation> keys;
> try {
> keys = new HashSet<>(pendingCleanupIMap.keySet());
> } catch (Exception e) {
> logger.warning("Failed to get pending cleanup keys: " +
e.getMessage());
> return;
> }
>
> // Now iterate over the copied key set
> for (PipelineLocation key : keys) {
> try {
> PipelineCleanupRecord record = pendingCleanupIMap.get(key);
> if (record != null) {
> processPendingPipelineCleanup(key, record);
> }
> } catch (HazelcastInstanceNotActiveException e) {
> logger.warning("Skip cleanup: hazelcast not active");
> break;
> } catch (Throwable t) {
> logger.warning(String.format(
> "Failed to cleanup pipeline %s: %s", key, t.getMessage()),
t);
> // Continue with next pipeline instead of crashing
> }
> }
> }
> ```
>
> ### Issue : Infinite Loop - Risk of Thread Starvation and CPU Spike
> **Severity**: **High** **File**: `JobMaster.java` **Location**:
[JobMaster.java:934-951](seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java#L934-L951)
>
> **Problem Description**:
>
> * `enqueuePipelineCleanupIfNeeded()` uses `while(true)` for CAS operations
without an exit condition
> * In extreme high-concurrency scenarios, multiple threads competing for
the same pipeline could cause **indefinite spinning**
> * This consumes CPU resources and may block the calling thread indefinitely
>
> **Current Code**:
>
> ```java
> while (true) { // ← No exit condition!
> PipelineCleanupRecord existing =
pendingCleanupIMap.get(pipelineLocation);
> if (existing == null) {
> PipelineCleanupRecord prev =
pendingCleanupIMap.putIfAbsent(pipelineLocation, newRecord);
> if (prev == null) return;
> existing = prev;
> }
> PipelineCleanupRecord merged = existing.mergeFrom(newRecord);
> if (merged.equals(existing)) return;
> if (pendingCleanupIMap.replace(pipelineLocation, existing, merged)) {
> return;
> }
> // ← If replace fails, loop continues indefinitely
> }
> ```
>
> **Impact**:
>
> * Thread starvation: calling thread may spin indefinitely
> * CPU spike: 100% CPU usage on one core
> * System instability: may affect job submission and scheduling
>
> **Risk Scenario**:
>
> ```
> 10 threads simultaneously try to enqueue cleanup for Pipeline X
> Each thread:
> 1. Reads current record
> 2. Merges with new data
> 3. Attempts CAS replace
> 4. Fails due to another thread's update
> 5. Retries indefinitely
>
> Result: All 10 threads spinning, consuming 1000% CPU
> ```
>
> **Recommended Fix**:
>
> ```java
> // Add this constant to JobMaster
> private static final int MAX_ENQUEUE_RETRIES = 100;
>
> public void enqueuePipelineCleanupIfNeeded(
> PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
>
> // ... existing validation logic ...
>
> int retryCount = 0;
> while (retryCount < MAX_ENQUEUE_RETRIES) {
> PipelineCleanupRecord existing =
pendingCleanupIMap.get(pipelineLocation);
>
> if (existing == null) {
> PipelineCleanupRecord prev =
pendingCleanupIMap.putIfAbsent(pipelineLocation, newRecord);
> if (prev == null) {
> return; // Success
> }
> existing = prev;
> }
>
> PipelineCleanupRecord merged = existing.mergeFrom(newRecord);
> if (merged.equals(existing)) {
> return; // No changes needed
> }
>
> if (pendingCleanupIMap.replace(pipelineLocation, existing,
merged)) {
> return; // Success
> }
>
> retryCount++;
>
> // Optional: Add backoff to reduce contention
> if (retryCount % 10 == 0) {
> try {
> Thread.sleep(1); // 1ms backoff every 10 retries
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> logger.warning("Enqueue cleanup interrupted for pipeline:
" + pipelineLocation);
> return;
> }
> }
> }
>
> // Failed after max retries - log error but don't throw exception
> logger.error(String.format(
> "Failed to enqueue pipeline cleanup for %s after %d retries due to
high contention. " +
> "Cleanup may be delayed until next scheduled run.",
> pipelineLocation, MAX_ENQUEUE_RETRIES));
> }
> ```
Issue 1: The maximum number of retries must not be set, as it can easily
lead to data loss and program exceptions. Deletion must be performed through
normal logic.
Issue 2:It will not throw a ConcurrentModificationException. The result
returned by IMap.entrySet is a cloned one, not distributed data.
Issue3:What is actually triggered is at the end of SubPlan. It is not a
frequent call. For more stability, I will add a sleep instead of setting a
maximum number of times.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]