DanielCarter-stack commented on PR #10580:
URL: https://github.com/apache/seatunnel/pull/10580#issuecomment-4021660601
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10580", "part": 1,
"total": 1} -->
## Issue 1: Incomplete Resource Cleanup Leading to Potential Leaks
**Location**: `CoordinatorService.java:559-567`
```java
} catch (Exception e) {
isActive = false;
logger.severe("check new active master error, will retry later.", e);
try {
clearCoordinatorService();
} catch (Exception ex) {
logger.warning("clear coordinator service failed.", ex);
}
}
```
**Related Context**:
- `initCoordinatorService()`: line 404-447
- Initialize `jobHistoryService` (line 415-429)
- Initialize `eventProcessor` (line 430-434)
- Initialize `connectorPackageService` (line 437-440)
- Start `restoreAllJobFromMasterNodeSwitchFuture` (line 443-446)
- `clearCoordinatorService()`: line 570-604
- Only clean up `runningJobMasterMap`, `pendingJobQueue`,
`executorService`, `resourceManager`, `eventProcessor`
- **Does NOT clean up `jobHistoryService` and `connectorPackageService`**
**Problem Description**:
If an exception is thrown at different stages during
`initCoordinatorService()` execution:
- Scenario A: `jobHistoryService` created successfully, but `eventProcessor`
creation fails → `jobHistoryService` will not be cleaned up
- Scenario B: `restoreAllJobFromMasterNodeSwitchFuture` has been submitted
but subsequent code fails → async tasks may still be running
- Scenario C: `connectorPackageService` created successfully but subsequent
steps fail → service will not be cleaned up
This leads to:
1. `jobHistoryService` holds stale IMap references (such as
`runningJobStateIMap`, `finishedJobStateIMap`), potentially occupying memory
2. `connectorPackageService` may hold unreleased resources
3. `restoreAllJobFromMasterNodeSwitchFuture` reference itself is not
cleared, but the executor has been shut down
**Potential Risks**:
- **Risk 1**: Memory leak. Each retry may create new `jobHistoryService`,
but old ones are not cleaned up
- **Risk 2**: Resource leak. `connectorPackageService` may hold file handles
or network connections
- **Risk 3**: State inconsistency. If retry succeeds, old
`jobHistoryService` may still be using stale IMap references
**Impact Scope**:
- **Direct Impact**: Lifecycle management of `CoordinatorService`
- **Indirect Impact**: Continuous failures lead to continuously growing
memory usage
- **Affected Area**: Core framework, affecting all users using cluster mode
**Severity**: **MAJOR**
**Improvement Suggestions**:
```java
// Added in clearCoordinatorService()
public synchronized void clearCoordinatorService() {
// interrupt all JobMaster
runningJobMasterMap.values().forEach(JobMaster::interrupt);
if (isWaitStrategy) {
pendingJobQueue
.getJobIdMap()
.values()
.forEach(
pendingJobInfo -> {
JobMaster jobMaster =
pendingJobInfo.getJobMaster();
jobMaster.interrupt();
});
pendingJobQueue.clear();
}
executorService.shutdownNow();
runningJobMasterMap.clear();
try {
executorService.awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new SeaTunnelEngineException("wait clean executor service
error", e);
}
// Added: cleanup jobHistoryService
jobHistoryService = null;
// Added: cleanup connectorPackageService
if (connectorPackageService != null) {
try {
// If ConnectorPackageService has a close() method, call it
// connectorPackageService.close();
} catch (Exception e) {
logger.warning("Failed to close connector package service", e);
}
connectorPackageService = null;
}
// Added: cleanup restoreAllJobFromMasterNodeSwitchFuture
restoreAllJobFromMasterNodeSwitchFuture = null;
if (resourceManager != null) {
resourceManager.close();
}
try {
if (eventProcessor != null) {
eventProcessor.close();
eventProcessor = null; // Added: clear references
}
} catch (Exception e) {
throw new SeaTunnelEngineException("close event processor error", e);
}
}
```
**Rationale**:
- Ensure all resources initialized in `initCoordinatorService()` are cleaned
up
- Prevent memory leaks and resource leaks
- Ensure a clean initial state for the next retry
---
## Issue 2: Interruption Risk of Async Recovery Tasks
**Location**: `CoordinatorService.java:443-446, 559-567`
```java
// In initCoordinatorService()
restoreAllJobFromMasterNodeSwitchFuture =
new PassiveCompletableFuture(
CompletableFuture.runAsync(
this::restoreAllRunningJobFromMasterNodeSwitch,
executorService));
// In checkNewActiveMaster() exception handling
} catch (Exception e) {
isActive = false;
logger.severe("check new active master error, will retry later.", e);
try {
clearCoordinatorService(); // Will call
executorService.shutdownNow()
} catch (Exception ex) {
logger.warning("clear coordinator service failed.", ex);
}
}
```
**Related Context**:
- `waitForJobComplete()` (line 733-759) depends on
`restoreAllJobFromMasterNodeSwitchFuture.join()`
- `restoreAllRunningJobFromMasterNodeSwitch()` (line 449-504) recovers
running jobs
- `restoreJobFromMasterActiveSwitch()` (line 506-537) creates new JobMaster
and adds to pending queue
**Problem Description**:
This is a **serious concurrency timing issue**:
1. `initCoordinatorService()` submits async recovery tasks to
`executorService`
2. Async tasks start executing, possibly performing the following operations:
- Reading job information from `runningJobInfoIMap`
- Creating new `JobMaster` instances
- Adding jobs to `pendingJobQueue`
3. If subsequent code in `initCoordinatorService()` throws an exception
during async task execution
4. `clearCoordinatorService()` calls `executorService.shutdownNow()`
5. **Recovery tasks in progress are forcibly interrupted**
6. This may lead to:
- Some jobs have been added in `pendingJobQueue`
- Some jobs have been added in `runningJobMasterMap`
- But `restoreAllJobFromMasterNodeSwitchFuture`'s Future has been cleared
(if fixed per Issue 1's suggestion)
- Next call to `waitForJobComplete()` will cause join() to fail or block
**Potential Risks**:
- **Risk 1**: Job state inconsistency. Some jobs are marked as recovered,
but recovery process is incomplete
- **Risk 2**: `waitForJobComplete()` may permanently block or fail, as it
depends on `restoreAllJobFromMasterNodeSwitchFuture.join()`
- **Risk 3**: Incomplete job information may exist in `pendingJobQueue` and
`runningJobMasterMap`
**Impact Scope**:
- **Direct Impact**: Job recovery process during Master node switching
- **Indirect Impact**: Users querying job status through
`waitForJobComplete()` may fail
- **Affected Area**: Core framework, all scenarios with running jobs during
Master switch
**Severity**: **CRITICAL**
**Improvement Suggestions**:
```java
private void initCoordinatorService() {
runningJobInfoIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
runningJobStateIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
runningJobStateTimestampsIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS);
ownedSlotProfilesIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
metricsImap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
jobHistoryService =
new JobHistoryService(
nodeEngine,
runningJobStateIMap,
logger,
pendingJobQueue.getJobIdMap(),
runningJobMasterMap,
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE),
nodeEngine
.getHazelcastInstance()
.getMap(Constant.IMAP_FINISHED_JOB_METRICS),
nodeEngine
.getHazelcastInstance()
.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO),
engineConfig.getHistoryJobExpireMinutes());
eventProcessor =
createJobEventProcessor(
engineConfig.getEventReportHttpApi(),
engineConfig.getEventReportHttpHeaders(),
nodeEngine);
// If the user has configured the connector package service, create it
on the master node.
ConnectorJarStorageConfig connectorJarStorageConfig =
engineConfig.getConnectorJarStorageConfig();
if (connectorJarStorageConfig.getEnable()) {
connectorPackageService = new
ConnectorPackageService(seaTunnelServer);
}
// Modified: move async task creation to the end, ensure all preceding
initializations succeed before creating
restoreAllJobFromMasterNodeSwitchFuture =
new PassiveCompletableFuture(
CompletableFuture.runAsync(
this::restoreAllRunningJobFromMasterNodeSwitch,
executorService));
}
// In the exception handling of checkNewActiveMaster()
} catch (Exception e) {
isActive = false;
logger.severe("check new active master error, will retry later.", e);
try {
// Added: wait for restore task to complete or cancel before cleanup
if (restoreAllJobFromMasterNodeSwitchFuture != null) {
try {
// Brief wait to avoid blocking too long
restoreAllJobFromMasterNodeSwitchFuture.get(1,
TimeUnit.SECONDS);
} catch (TimeoutException ex) {
logger.warning("Restore task timeout during cleanup, will be
interrupted", ex);
} catch (ExecutionException | InterruptedException ex) {
logger.warning("Restore task failed during cleanup", ex);
}
}
clearCoordinatorService();
} catch (Exception ex) {
logger.warning("clear coordinator service failed.", ex);
}
}
```
**Rationale**:
- Ensure `restoreAllJobFromMasterNodeSwitchFuture` is created only after all
other initializations succeed
- Give recovery tasks a brief opportunity to complete (1 second) before
cleanup, avoiding frequent interruptions
- If timeout indeed occurs, execute cleanup; at this point need to accept
the consequences of interrupted recovery
- Or consider splitting `initCoordinatorService()` into multiple phases,
each phase can be independently rolled back
**Alternative**: Move `restoreAllJobFromMasterNodeSwitchFuture` creation out
of `initCoordinatorService()`, create it only after confirming `isActive =
true`.
---
## Issue 3: Lack of Retry Backoff Strategy and Circuit Breaker Mechanism
**Location**: `CoordinatorService.java:218-219, 559-567`
```java
// In constructor
masterActiveListener.scheduleAtFixedRate(
this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
// In exception handling
} catch (Exception e) {
isActive = false;
logger.severe("check new active master error, will retry later.", e);
try {
clearCoordinatorService();
} catch (Exception ex) {
logger.warning("clear coordinator service failed.", ex);
}
}
```
**Related Context**:
- Hazelcast's `RetryableHazelcastException` usually only succeeds after
multiple retries
- If it's a configuration error or permanent failure, continuous retry is
meaningless
- Frequent retries consume CPU and generate large amounts of logs
**Problem Description**:
Current implementation is **unlimited fixed-interval retry**:
- Retries every 100ms with no maximum retry limit
- No backoff strategy
- No distinction between transient errors and permanent errors
This leads to the following problems:
1. **Log flooding**: If error persists, 10 SEVERE logs will be generated per
second
2. **Resource waste**: Frequent `clearCoordinatorService()` and
`initCoordinatorService()` calls consume CPU
3. **Lack of observability**: Cannot know from outside how many retries have
occurred
4. **Permanent errors cannot be alerted**: Configuration errors lead to
permanent retries
**Potential Risks**:
- **Risk 1**: Disk filled with logs (if error persists)
- **Risk 2**: Excessive CPU usage (frequent initialization and cleanup)
- **Risk 3**: Operations personnel cannot promptly detect real failures
(logs drowned out)
**Impact Scope**:
- **Direct Impact**: Health monitoring of `CoordinatorService`
- **Indirect Impact**: Overall cluster performance and stability
- **Affected Area**: Core framework
**Severity**: **MAJOR**
**Improvement Suggestions**:
```java
// Add fields in CoordinatorService
private final AtomicLong initFailureCount = new AtomicLong(0);
private static final long MAX_INIT_FAILURES_BEFORE_BACKOFF = 10;
private static final long BACKOFF_INTERVAL_MS = 5000; // 5 seconds
private void checkNewActiveMaster() {
try {
if (!isActive && this.seaTunnelServer.isMasterNode()) {
logger.info(
"This node become a new active master node, begin init
coordinator service");
if (this.executorService.isShutdown()) {
this.executorService =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("seatunnel-coordinator-service-%d")
.build());
}
initCoordinatorService();
isActive = true;
initFailureCount.set(0); // Reset failure count
} else if (isActive && !this.seaTunnelServer.isMasterNode()) {
isActive = false;
logger.info(
"This node become leave active master node, begin clear
coordinator service");
clearCoordinatorService();
}
} catch (Exception e) {
isActive = false;
long failureCount = initFailureCount.incrementAndGet();
// Determine if backoff is needed
if (failureCount >= MAX_INIT_FAILURES_BEFORE_BACKOFF) {
logger.severe(String.format(
"CoordinatorService initialization failed after %d attempts
(error: %s). " +
"Entering backoff mode, will retry in %dms. Node: %s,
Master: %s",
failureCount,
e.getClass().getSimpleName(),
BACKOFF_INTERVAL_MS,
nodeEngine.getThisAddress(),
nodeEngine.getMasterAddress()
), e);
// Can send alert here
// alertService.sendAlert("CoordinatorService init failed");
// Wait for a period of time then reset counter
try {
Thread.sleep(BACKOFF_INTERVAL_MS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
logger.warning("Backoff sleep interrupted", ie);
}
} else {
logger.severe(String.format(
"CoordinatorService initialization failed (attempt %d/%d):
%s. Will retry in %dms",
failureCount,
MAX_INIT_FAILURES_BEFORE_BACKOFF,
e.getMessage(),
100
), e);
}
try {
clearCoordinatorService();
} catch (Exception ex) {
logger.warning("clear coordinator service failed.", ex);
}
}
}
// Add Metrics methods for monitoring
public long getInitFailureCount() {
return initFailureCount.get();
}
```
**Rationale**:
- Avoid log flooding and resource waste
- Provide better observability
- Distinguish between transient errors and permanent errors
- Follow distributed systems best practices (exponential backoff)
---
## Issue 4: Insufficient Log Information, Lacking Context
**Location**: `CoordinatorService.java:561`
```java
logger.severe("check new active master error, will retry later.", e);
```
**Related Context**:
- Operations personnel need to quickly identify whether it's a network
issue, configuration issue, or Hazelcast issue
- Need to know current node status and cluster status
- Need to know which retry attempt this is
**Problem Description**:
Current log information lacks key context:
1. No node address information
2. No Master address information
3. No retry count
4. No concise description of exception type
**Potential Risks**:
- **Risk 1**: Difficult problem localization, need to check full stack trace
to understand exception type
- **Risk 2**: Cannot quickly determine if it's a widespread issue or
specific to a node
- **Risk 3**: Cannot assess retry trends
**Impact Scope**:
- **Direct Impact**: Observability and problem diagnosis efficiency
- **Affected Area**: Operations and development personnel
**Severity**: **MINOR**
**Improvement Suggestions**:
```java
} catch (Exception e) {
isActive = false;
long failureCount = initFailureCount.incrementAndGet();
Throwable rootCause = ExceptionUtils.getRootCause(e);
String errorType = rootCause.getClass().getSimpleName();
String errorMsg = rootCause.getMessage();
logger.severe(String.format(
"CoordinatorService initialization failed (attempt #%d). " +
"Node: %s, Cluster Master: %s, Error Type: %s, Error: %s. " +
"Will retry in %dms.",
failureCount,
nodeEngine.getThisAddress(),
nodeEngine.getMasterAddress(),
errorType,
errorMsg != null && errorMsg.length() > 100 ? errorMsg.substring(0,
100) + "..." : errorMsg,
100
), e);
try {
clearCoordinatorService();
} catch (Exception ex) {
logger.warning(String.format(
"Failed to clear CoordinatorService after initialization failure
(node: %s)",
nodeEngine.getThisAddress()
), ex);
}
}
```
**Rationale**:
- Provide richer context information
- Help quickly locate problems
- Follow production environment logging best practices
---
## Issue 5: Missing Cleanup of jobHistoryService in clearCoordinatorService()
**Location**: `CoordinatorService.java:570-604`
```java
public synchronized void clearCoordinatorService() {
// interrupt all JobMaster
runningJobMasterMap.values().forEach(JobMaster::interrupt);
// ...
executorService.shutdownNow();
runningJobMasterMap.clear();
try {
executorService.awaitTermination(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new SeaTunnelEngineException("wait clean executor service
error", e);
}
if (resourceManager != null) {
resourceManager.close();
}
try {
if (eventProcessor != null) {
eventProcessor.close();
}
} catch (Exception e) {
throw new SeaTunnelEngineException("close event processor error", e);
}
// Missing cleanup for jobHistoryService, connectorPackageService,
restoreAllJobFromMasterNodeSwitchFuture
}
```
**Related Context**:
- `initCoordinatorService()` creates `jobHistoryService` at line 415-429
- `initCoordinatorService()` creates `connectorPackageService` at line
437-440
- `initCoordinatorService()` creates
`restoreAllJobFromMasterNodeSwitchFuture` at line 443-446
**Problem Description**:
`clearCoordinatorService()` does not clean up all resources initialized in
`initCoordinatorService()`. This is related to Issue 1, but emphasizes the
**symmetry** issue:
- If `init` creates resources, `clear` should clean them up
- Current code is asymmetric, leading to partial resource leaks
**Potential Risks**:
- **Risk 1**: Memory leak
- **Risk 2**: `jobHistoryService` may hold references to closed IMap,
causing errors on next use
**Impact Scope**:
- **Direct Impact**: Resource management
- **Affected Area**: Core framework
**Severity**: **MAJOR** (Together with Issue 1 forms a complete problem)
**Improvement Suggestions**: See Issue 1's improvement suggestions
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]