DanielCarter-stack commented on PR #10418:
URL: https://github.com/apache/seatunnel/pull/10418#issuecomment-3817674342
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10418", "part": 1,
"total": 1} -->
### Issue 1: Missing Maximum Retry Limit
**Location**:
- `CoordinatorService.java:496-562` (processPendingPipelineCleanup method)
- `PipelineCleanupRecord.java:54` (attemptCount field)
**Related Context**:
- Definition: `PipelineCleanupRecord.java:54`
- Update: `CoordinatorService.java:514` -
`updated.setAttemptCount(record.getAttemptCount() + 1);`
- Usage: None (recorded but not used)
**Problem Description**:
The `attemptCount` field records the number of cleanup attempts, but it is
not used in the code to limit the number of retries. This means that if a
Pipeline continuously fails to be cleaned up (for example, if a Worker node is
permanently offline), it will remain in the cleanup queue indefinitely,
attempting once every 60 seconds, and will never be removed.
**Potential Risks**:
- **Risk1**: If a large number of Pipelines cannot be cleaned up (for
example, large-scale Worker failure), `pendingPipelineCleanupIMap` will
continue to grow, occupying Hazelcast memory
- **Risk2**: Each cleanup attempt generates logs and Hazelcast operations,
resulting in resource waste
- **Risk3**: Operations personnel cannot distinguish between "temporary
failures" and "permanent failures"
**Impact Scope**:
- **Direct Impact**: `CoordinatorService.cleanupPendingPipelines()`
- **Indirect Impact**: Hazelcast IMap memory usage
- **Affected Area**: Core framework, all users using SeaTunnel
**Severity**: MAJOR
**Improvement Suggestions**:
```java
// CoordinatorService.java
private static final int MAX_CLEANUP_ATTEMPTS = 10; // Maximum 10 attempts
(10 minutes)
private void processPendingPipelineCleanup(
PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
// ... existing logic ...
// Add retry count check at the end of the method
if (updated.getAttemptCount() >= MAX_CLEANUP_ATTEMPTS &&
!updated.isCleaned()) {
logger.severe(
String.format(
"Pipeline %s cleanup failed after %d attempts. Giving up. " +
"Metrics cleaned: %s, TaskGroups cleaned: %s/%s",
pipelineLocation,
updated.getAttemptCount(),
updated.isMetricsImapCleaned(),
updated.getCleanedTaskGroups().size(),
updated.getTaskGroups() != null ?
updated.getTaskGroups().size() : 0));
// Delete record even if giving up, to avoid infinite accumulation
pendingPipelineCleanupIMap.remove(pipelineLocation, record);
return;
}
// ... existing logic ...
}
```
**Rationale**:
- Prevent infinite retries
- Release Hazelcast memory
- Alert operations personnel that manual intervention is needed
- 10-minute retry window is sufficient to handle temporary network
fluctuations
---
### Issue 2: Missing Record Expiration Time (TTL)
**Location**:
- `PipelineCleanupRecord.java:52-53` (createTimeMillis,
lastAttemptTimeMillis)
- `Constant.java:62` (IMAP_PENDING_PIPELINE_CLEANUP definition)
**Related Context**:
- Creation: `JobMaster.java:929` - `now` (createTimeMillis)
- Update: `CoordinatorService.java:513` -
`updated.setLastAttemptTimeMillis(now)`
**Problem Description**:
The cleanup records do not have TTL (Time To Live) set. Although records are
deleted after successful cleanup, in certain abnormal situations (such as code
bugs, Hazelcast failures), records may remain in the IMap forever. In addition,
even if records have been in the queue for a long time (for example, several
days), there is no forced expiration mechanism.
**Potential Risks**:
- **Risk1**: Orphaned records occupy Hazelcast memory
- **Risk2**: If Hazelcast IMap has no TTL configured, records will exist
permanently
- **Risk3**: Long-running clusters may accumulate a large number of expired
records
**Impact Scope**:
- **Direct Impact**: `IMAP_PENDING_PIPELINE_CLEANUP` IMap
- **Indirect Impact**: Hazelcast cluster memory usage
- **Affected Area**: Core framework, production environment
**Severity**: MAJOR
**Improvement Suggestions**:
```java
// Option 1: Set TTL in IMap configuration (recommended)
// Constant.java
public static final String IMAP_PENDING_PIPELINE_CLEANUP =
"engine_pendingPipelineCleanup";
// In Hazelcast configuration (requires documentation or setting in
initialization code):
config.getMapConfig("engine_pendingPipelineCleanup")
.setMaxIdleSeconds(86400); // Expires after 24 hours of inactivity
// Option 2: Check expiration time in code
private static final long CLEANUP_RECORD_TTL_MILLIS =
TimeUnit.HOURS.toMillis(24); // 24 hours
private void processPendingPipelineCleanup(
PipelineLocation pipelineLocation, PipelineCleanupRecord record) {
long now = System.currentTimeMillis();
// Check if the record has expired
if (now - record.getCreateTimeMillis() > CLEANUP_RECORD_TTL_MILLIS) {
logger.warning(
String.format(
"Pipeline cleanup record %s expired after %d ms. Removing.",
pipelineLocation,
now - record.getCreateTimeMillis()));
pendingPipelineCleanupIMap.remove(pipelineLocation, record);
return;
}
// ... existing logic ...
}
```
**Rationale**:
- Prevent unlimited accumulation of records
- Automatic cleanup even in abnormal situations
- 24-hour TTL provides sufficient retry window
---
### Issue 3: PipelineCleanupRecord's isCleaned() Method Has NPE Risk
**Location**: `PipelineCleanupRecord.java:132-137`
```java
public boolean isCleaned() {
return metricsImapCleaned
&& taskGroups != null
&& cleanedTaskGroups != null
&& cleanedTaskGroups.containsAll(taskGroups.keySet());
}
```
**Related Context**:
- Caller: `CoordinatorService.java:559` - `if (updated.isCleaned())`
- Data Source: Deserialization (readData) or newly created
**Problem Description**:
Although `taskGroups` and `cleanedTaskGroups` are initialized to empty
collections during construction, during deserialization (`readData()` method),
if size is -1, they will be set to `null`:
```java
// PipelineCleanupRecord.java:104-114
int taskGroupsSize = in.readInt();
if (taskGroupsSize >= 0) {
taskGroups = new HashMap<>(taskGroupsSize);
// ...
} else {
taskGroups = null; // May be null
}
```
Although the current code only writes -1 when the collection is null in
`writeData()`, this is a potential NPE risk point, especially during
cross-version serialization/deserialization.
**Potential Risks**:
- **Risk1**: If serialization logic is modified in the future, NPE may be
introduced
- **Risk2**: Errors may occur when interacting with records from old versions
- **Risk3**: `taskGroups.keySet()` will throw NPE when taskGroups is null
**Impact Scope**:
- **Direct Impact**: `PipelineCleanupRecord.isCleaned()`
- **Caller**: `CoordinatorService.processPendingPipelineCleanup()`
- **Affected Area**: Core cleanup logic
**Severity**: MINOR (current code will not trigger, but defensive
programming suggests fixing)
**Improvement Suggestions**:
```java
public boolean isCleaned() {
if (!metricsImapCleaned) {
return false;
}
if (taskGroups == null || taskGroups.isEmpty()) {
// If there are no taskGroups, only check if metrics are cleaned up
return metricsImapCleaned;
}
if (cleanedTaskGroups == null) {
return false;
}
return cleanedTaskGroups.containsAll(taskGroups.keySet());
}
```
Or a more concise version:
```java
public boolean isCleaned() {
return metricsImapCleaned
&& (taskGroups == null || taskGroups.isEmpty()
|| (cleanedTaskGroups != null
&& cleanedTaskGroups.containsAll(taskGroups.keySet())));
}
```
**Rationale**:
- Defensive programming, avoid NPE
- Explicitly handle empty collection edge cases
- Improve code robustness
---
### Issue 4: shouldCleanup Logic Duplication
**Location**:
- `JobMaster.java:902-904`
- `CoordinatorService.java:589-595`
**Problem Description**:
The same cleanup condition judgment appears repeatedly in two classes:
```java
// JobMaster.java:902-904
boolean shouldCleanup =
PipelineStatus.CANCELED.equals(pipelineStatus)
|| (PipelineStatus.FINISHED.equals(pipelineStatus) &&
!savepointEnd);
// CoordinatorService.java:589-595
private boolean shouldCleanup(PipelineCleanupRecord record) {
if (record == null || record.getFinalStatus() == null) {
return false;
}
if (record.isSavepointEnd()) {
return false;
}
return PipelineStatus.CANCELED.equals(record.getFinalStatus())
|| PipelineStatus.FINISHED.equals(record.getFinalStatus());
}
```
This violates the DRY (Don't Repeat Yourself) principle. If cleanup
conditions need to be adjusted in the future, both places must be modified
simultaneously.
**Potential Risks**:
- **Risk1**: Future modifications may miss one location, causing
inconsistency
- **Risk2**: Increased code maintenance cost
**Impact Scope**:
- **Direct Impact**: `JobMaster.enqueuePipelineCleanupIfNeeded()` and
`CoordinatorService.shouldCleanup()`
- **Affected Area**: Code maintainability
**Severity**: MINOR
**Improvement Suggestions**:
```java
// Add static utility method in PipelineCleanupRecord
public static boolean shouldCleanup(
PipelineStatus finalStatus,
boolean isSavepointEnd) {
if (finalStatus == null) {
return false;
}
if (isSavepointEnd) {
return false;
}
return PipelineStatus.CANCELED.equals(finalStatus)
|| PipelineStatus.FINISHED.equals(finalStatus);
}
// Add instance method in PipelineCleanupRecord
public boolean shouldCleanup() {
return shouldCleanup(this.finalStatus, this.savepointEnd);
}
// JobMaster.java usage
boolean shouldCleanup = PipelineCleanupRecord.shouldCleanup(
pipelineStatus,
savepointEnd);
// CoordinatorService.java usage
private boolean shouldCleanup(PipelineCleanupRecord record) {
return record != null && record.shouldCleanup();
}
```
**Rationale**:
- Eliminate code duplication
- Improve maintainability
- Concentrate business logic in the data model
---
### Issue 5: Cleanup Interval Hardcoded
**Location**: `CoordinatorService.java:113`
```java
private static final int PIPELINE_CLEANUP_INTERVAL_SECONDS = 60;
```
**Problem Description**:
The cleanup interval is hardcoded to 60 seconds and cannot be adjusted
according to actual scenarios. For production environments, 60 seconds may be
too long (resource release delay) or too short (frequent cleanup tasks).
**Potential Risks**:
- **Risk1**: In resource-sensitive scenarios, 60-second delay may cause
memory pressure
- **Risk2**: Unable to dynamically adjust based on cluster scale
**Impact Scope**:
- **Direct Impact**: `CoordinatorService.pipelineCleanupScheduler`
scheduling frequency
- **Affected Area**: Production environment tuning
**Severity**: MINOR
**Improvement Suggestions**:
```java
// Option 1: Read from configuration file
private final int pipelineCleanupIntervalSeconds;
public CoordinatorService(
@NonNull NodeEngineImpl nodeEngine,
@NonNull SeaTunnelServer seaTunnelServer,
EngineConfig engineConfig) {
// ...
this.pipelineCleanupIntervalSeconds =
engineConfig.getCoordinatorServiceConfig()
.getPipelineCleanupIntervalSeconds() != null
?
engineConfig.getCoordinatorServiceConfig().getPipelineCleanupIntervalSeconds()
: 60; // Default 60 seconds
pipelineCleanupScheduler.scheduleAtFixedRate(
this::cleanupPendingPipelines,
this.pipelineCleanupIntervalSeconds,
this.pipelineCleanupIntervalSeconds,
TimeUnit.SECONDS);
}
// Option 2: Use dynamic interval (exponential backoff)
// Calculate next cleanup time in PipelineCleanupRecord based on attemptCount
```
**Rationale**:
- Improve flexibility
- Support tuning for different scenarios
- Leave room for future exponential backoff strategy
---
### Issue 6: FAILED Status Pipelines Not Cleaned Up
**Location**:
- `JobMaster.java:902-904` (shouldCleanup logic)
- `SubPlan.java:242-288` (getPipelineEndState method)
**Problem Description**:
The logic of `shouldCleanup` only handles `CANCELED` and `FINISHED`
statuses, and does not handle `FAILED` status:
```java
boolean shouldCleanup =
PipelineStatus.CANCELED.equals(pipelineStatus)
|| (PipelineStatus.FINISHED.equals(pipelineStatus) &&
!savepointEnd);
```
But from the definition of `PipelineStatus.isEndState()`, `FAILED` is also
an end state:
```java
// PipelineStatus.java:76-78
public boolean isEndState() {
return this == FINISHED || this == CANCELED || this == FAILED;
}
```
This means FAILED Pipelines will not be added to the cleanup queue, and
their resources (metrics and TaskGroupContext) may not be cleaned up.
**Related Context**:
Looking at the `getPipelineEndState()` method of `SubPlan.java`, Pipelines
may enter FAILED status in the following situations:
- Task execution failure (`failedTaskNum > 0`)
- Checkpoint failure
- Checkpoint failure during Cancel process
**Potential Risks**:
- **Risk1**: Metrics of FAILED status Pipelines will not be cleaned up
(metricsImap leak)
- **Risk2**: TaskGroupContext of FAILED status Pipelines will not be cleaned
up (Worker memory leak)
- **Risk3**: Frequently failing tasks will accelerate memory leaks
**Impact Scope**:
- **Direct Impact**: Resource release for all failed tasks
- **Indirect Impact**: Hazelcast IMap and Worker node memory
- **Affected Area**: All users using SeaTunnel, especially frequent failure
scenarios
**Severity**: CRITICAL (this is a serious omission)
**Improvement Suggestions**:
```java
// JobMaster.java:902-904
boolean shouldCleanup =
PipelineStatus.CANCELED.equals(pipelineStatus)
|| PipelineStatus.FAILED.equals(pipelineStatus)
|| (PipelineStatus.FINISHED.equals(pipelineStatus) &&
!savepointEnd);
// Or use isEndState() but exclude savepoint
boolean shouldCleanup = pipelineStatus.isEndState() && !savepointEnd;
// CoordinatorService.java:589-595
private boolean shouldCleanup(PipelineCleanupRecord record) {
if (record == null || record.getFinalStatus() == null) {
return false;
}
if (record.isSavepointEnd()) {
return false;
}
// Modify to support all end states
return record.getFinalStatus().isEndState();
}
```
**Rationale**:
- FAILED status Pipelines also need to release resources
- This may be a serious memory leak point
- After fixing, test cases need to be added to verify cleanup of FAILED
status
---
### Issue 7: Missing JavaDoc Documentation
**Location**: `PipelineCleanupRecord.java:39-42`
```java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PipelineCleanupRecord implements IdentifiedDataSerializable {
// No class-level JavaDoc
}
```
**Problem Description**:
The newly added `PipelineCleanupRecord` class lacks JavaDoc documentation,
which is detrimental to other developers understanding its purpose and design
intent.
**Impact Scope**:
- **Direct Impact**: Code readability and maintainability
- **Affected Area**: Future developers maintaining this code
**Severity**: MINOR
**Improvement Suggestions**:
```java
/**
* Record tracking the cleanup state of a finished pipeline.
*
* <p>This record is persisted in Hazelcast IMap
(IMAP_PENDING_PIPELINE_CLEANUP)
* and used by the background cleanup task in {@link CoordinatorService}
* to asynchronously release resources when the synchronous cleanup fails.
*
* <p><b>Resources tracked:</b>
* <ul>
* <li>{@link #metricsImapCleaned} - Metrics in {@code
IMAP_RUNNING_JOB_METRICS}</li>
* <li>{@link #taskGroups} - TaskGroups with their worker addresses</li>
* <li>{@link #cleanedTaskGroups} - TaskGroups whose contexts have been
cleaned</li>
* </ul>
*
* <p><b>Cleanup conditions:</b>
* <ul>
* <li>CANCELED pipelines are always cleaned</li>
* <li>FINISHED pipelines are cleaned unless they ended with savepoint</li>
* <li>FAILED pipelines are cleaned (note: original code may have missed
this)</li>
* </ul>
*
* <p><b>Lifecycle:</b>
* <ol>
* <li>Created by {@link JobMaster#enqueuePipelineCleanupIfNeeded}</li>
* <li>Updated by {@link
CoordinatorService#processPendingPipelineCleanup}</li>
* <li>Removed when {@link #isCleaned()} returns true</li>
* </ol>
*
* @see PipelineLocation
* @see PipelineStatus
* @see
org.apache.seatunnel.engine.server.task.operation.CleanTaskGroupContextOperation
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PipelineCleanupRecord implements IdentifiedDataSerializable {
// ...
}
```
**Rationale**:
- Apache top-level projects require good documentation
- Help other developers quickly understand design intent
- Record key architectural decisions
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]