DanielCarter-stack commented on PR #10506:
URL: https://github.com/apache/seatunnel/pull/10506#issuecomment-3941608493
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10506", "part": 1,
"total": 1} -->
### Issue 1: Improper exception handling when Restore fails
**Location**: `CoordinatorService.java:898-901`
```java
if (runningJobMaster == null) {
restoreAllJobFromMasterNodeSwitchFuture.join(); // May throw
CompletionException
runningJobMaster = runningJobMasterMap.get(taskGroupLocation.getJobId());
}
```
**Related context**:
- `restoreAllRunningJobFromMasterNodeSwitch()` (lines 449-504): Throws
`SeaTunnelEngineException` on exception
- `TaskExecutionService.notifyTaskStatusToMaster()` (lines 442-480): Expects
to catch `JobNotFoundException`
**Issue description**:
If `restoreAllJobFromMasterNodeSwitchFuture` completes exceptionally due to
restore failure, `.join()` will throw `CompletionException` (wrapping
`SeaTunnelEngineException`), instead of the expected `JobNotFoundException`.
This leads to:
1. Worker-side retry logic becomes ineffective: `notifyTaskStatusToMaster()`
only catches `JobNotFoundException`, for `CompletionException` it will enter
the generic exception branch and continue retrying
2. Log pollution: A large number of retries will generate warning logs
3. Semantic confusion: restore failure and job not found are two different
errors and should be handled separately
**Potential risks**:
- **Risk 1**: After restore failure, all task status updates will fail and
continuously retry, affecting system stability
- **Risk 2**: Error logs will mislead operations personnel, as it looks like
"job not found" but is actually "restore failed"
**Impact scope**:
- **Direct impact**: `CoordinatorService.updateTaskExecutionState()` method
- **Indirect impact**: `TaskExecutionService.notifyTaskStatusToMaster()`
retry logic
- **Affected area**: Zeta core framework, all jobs in scenarios where master
switches and restore fails
**Severity**: **MAJOR**
**Improvement suggestion**:
```java
// CoordinatorService.java:890-907
public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
logger.info(
String.format(
"Received task end from execution %s, state %s",
taskExecutionState.getTaskGroupLocation(),
taskExecutionState.getExecutionState()));
TaskGroupLocation taskGroupLocation =
taskExecutionState.getTaskGroupLocation();
JobMaster runningJobMaster =
runningJobMasterMap.get(taskGroupLocation.getJobId());
if (runningJobMaster == null) {
try {
// Wait for job restoration to complete during master node switch
restoreAllJobFromMasterNodeSwitchFuture.join();
} catch (Exception e) {
// If restore failed, treat it as job not found and log the error
logger.warning(
String.format(
"Failed to wait for restore completion for job %s: %s",
taskGroupLocation.getJobId(),
ExceptionUtils.getMessage(e)));
throw new JobNotFoundException(
String.format("Job %s not running (restore failed)",
taskGroupLocation.getJobId()), e);
}
runningJobMaster =
runningJobMasterMap.get(taskGroupLocation.getJobId());
}
if (runningJobMaster == null) {
throw new JobNotFoundException(
String.format("Job %s not running",
taskGroupLocation.getJobId()));
}
runningJobMaster.updateTaskExecutionState(taskExecutionState);
}
```
**Rationale**:
- Catch all exceptions that `.join()` may throw, convert to
`JobNotFoundException`
- Keep worker-side retry logic working normally
- Clearly distinguish between "restore failed" and "job not found" in logs
---
### Issue 2: Missing critical logs and comments
**Location**: `CoordinatorService.java:898-901`
```java
if (runningJobMaster == null) {
restoreAllJobFromMasterNodeSwitchFuture.join();
runningJobMaster = runningJobMasterMap.get(taskGroupLocation.getJobId());
}
```
**Related context**:
- Method JavaDoc: None
- Caller: `NotifyTaskStatusOperation.runInternal()` (lines 71-74)
**Issue description**:
The newly added waiting logic has no comments explaining it, and no logs are
recorded when triggered. This leads to:
1. **Poor maintainability**: Future code reviewers or maintainers won't
understand why we need to join future here
2. **Weak diagnosability**: When status updates are delayed due to master
switch, it's impossible to see from logs that we're waiting for restore
3. **Difficult debugging**: When problems occur in production, it's
impossible to determine whether this waiting branch was entered
**Potential risks**:
- **Risk 1**: Future developers may mistakenly think this is an unnecessary
block and remove it
- **Risk 2**: During performance tuning, unable to accurately assess the
impact of master switch on status updates
**Impact scope**:
- **Direct impact**: Code maintainability and observability
- **Indirect impact**: Problem diagnosis efficiency
- **Affected area**: Only affects
`CoordinatorService.updateTaskExecutionState()` method
**Severity**: **MINOR**
**Improvement suggestion**:
```java
public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
logger.info(
String.format(
"Received task end from execution %s, state %s",
taskExecutionState.getTaskGroupLocation(),
taskExecutionState.getExecutionState()));
TaskGroupLocation taskGroupLocation =
taskExecutionState.getTaskGroupLocation();
JobMaster runningJobMaster =
runningJobMasterMap.get(taskGroupLocation.getJobId());
// During master node switch, jobs are being restored asynchronously.
// If a job is not found in the runningJobMasterMap, wait for the
restoration
// to complete before throwing JobNotFoundException. This prevents false
negatives
// when workers report task status during the restore window.
if (runningJobMaster == null) {
logger.info(String.format(
"Job %s not found in runningJobMasterMap, waiting for restore
completion",
taskGroupLocation.getJobId()));
restoreAllJobFromMasterNodeSwitchFuture.join();
runningJobMaster =
runningJobMasterMap.get(taskGroupLocation.getJobId());
logger.info(String.format(
"Restore completed, job %s %s",
taskGroupLocation.getJobId(),
runningJobMaster != null ? "found" : "still not found"));
}
if (runningJobMaster == null) {
throw new JobNotFoundException(
String.format("Job %s not running",
taskGroupLocation.getJobId()));
}
runningJobMaster.updateTaskExecutionState(taskExecutionState);
}
```
**Rationale**:
- Clear comments explain why waiting is necessary
- Logs can help diagnose status update behavior during master switch
- Facilitates performance analysis and troubleshooting
---
### Issue 3: Insufficient test coverage: Restore failure scenarios
**Location**: `CoordinatorServiceTest.java:472-562`
**Related context**:
- `testUpdateTaskExecutionStateWaitsForRestoreCompletion()` (lines 472-542)
- `testUpdateTaskExecutionStateThrowsWhenJobNotFound()` (lines 545-562)
**Issue description**:
Existing tests only cover restore success scenarios, but not restore failure
scenarios. Specifically:
1. **Missing exceptional completion test**: No test for behavior when
`restoreAllJobFromMasterNodeSwitchFuture` completes exceptionally
2. **Edge cases**: No test for scenario where restore completes but job is
still not found (e.g., job was cancelled during restore)
3. **Concurrency scenarios**: No test for behavior when multiple threads
call `updateTaskExecutionState()` simultaneously
**Potential risks**:
- **Risk 1**: The exception handling issue described in Issue 1 may not be
discovered in tests and go directly to production
- **Risk 2**: Future code changes may break this fix, and tests won't detect
it
**Impact scope**:
- **Direct impact**: Test coverage
- **Indirect impact**: Code quality assurance
- **Affected area**: `CoordinatorService.updateTaskExecutionState()` method
**Severity**: **MINOR**
**Improvement suggestion**:
```java
@Test
void testUpdateTaskExecutionStateWhenRestoreFailed() throws Exception {
JobInformation jobInformation =
submitJob(
"CoordinatorServiceTest_testUpdateTaskExecutionStateWhenRestoreFailed",
"batch_fake_to_console.conf",
"test_update_task_execution_state_when_restore_failed");
CoordinatorService coordinatorService =
jobInformation.coordinatorService;
await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.RUNNING,
coordinatorService.getJobStatus(jobInformation.jobId)));
// Remove job from runningJobMasterMap to simulate master switch
Field mapField =
CoordinatorService.class.getDeclaredField("runningJobMasterMap");
mapField.setAccessible(true);
@SuppressWarnings("unchecked")
Map<Long, JobMaster> runningJobMasterMap =
(Map<Long, JobMaster>) mapField.get(coordinatorService);
runningJobMasterMap.remove(jobInformation.jobId);
// Create a failed future to simulate restore failure
CompletableFuture<Void> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new SeaTunnelEngineException("Restore
failed"));
PassiveCompletableFuture<Void> failedPassiveFuture =
new PassiveCompletableFuture<>(failedFuture);
Field futureField =
CoordinatorService.class.getDeclaredField(
"restoreAllJobFromMasterNodeSwitchFuture");
futureField.setAccessible(true);
futureField.set(coordinatorService, failedPassiveFuture);
// Try to update task state
TaskGroupLocation fakeLocation = new
TaskGroupLocation(jobInformation.jobId, 0, 0);
TaskExecutionState taskExecutionState =
new TaskExecutionState(fakeLocation, ExecutionState.FAILED);
// Should throw JobNotFoundException even when restore failed
Assertions.assertThrows(
JobNotFoundException.class,
() ->
coordinatorService.updateTaskExecutionState(taskExecutionState));
jobInformation.coordinatorServiceTest.shutdown();
}
@Test
void testUpdateTaskExecutionStateWhenJobRemovedDuringRestore() throws
Exception {
JobInformation jobInformation =
submitJob(
"CoordinatorServiceTest_testUpdateTaskExecutionStateWhenJobRemovedDuringRestore",
"batch_fake_to_console.conf",
"test_update_task_execution_state_job_removed_during_restore");
CoordinatorService coordinatorService =
jobInformation.coordinatorService;
await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.RUNNING,
coordinatorService.getJobStatus(jobInformation.jobId)));
Field mapField =
CoordinatorService.class.getDeclaredField("runningJobMasterMap");
mapField.setAccessible(true);
@SuppressWarnings("unchecked")
Map<Long, JobMaster> runningJobMasterMap =
(Map<Long, JobMaster>) mapField.get(coordinatorService);
// Remove job from map but don't add it back (simulating job
cancellation during restore)
Long jobId = jobInformation.jobId;
runningJobMasterMap.remove(jobId);
// Create a future that completes but doesn't restore the job
CompletableFuture<Void> completedFuture =
CompletableFuture.completedFuture(null);
PassiveCompletableFuture<Void> completedPassiveFuture =
new PassiveCompletableFuture<>(completedFuture);
Field futureField =
CoordinatorService.class.getDeclaredField(
"restoreAllJobFromMasterNodeSwitchFuture");
futureField.setAccessible(true);
futureField.set(coordinatorService, completedPassiveFuture);
// Try to update task state
TaskGroupLocation fakeLocation = new TaskGroupLocation(jobId, 0, 0);
TaskExecutionState taskExecutionState =
new TaskExecutionState(fakeLocation, ExecutionState.FAILED);
// Should throw JobNotFoundException
Assertions.assertThrows(
JobNotFoundException.class,
() ->
coordinatorService.updateTaskExecutionState(taskExecutionState));
jobInformation.coordinatorServiceTest.shutdown();
}
```
**Rationale**:
- Cover restore failure scenarios, verify exception handling correctness
- Cover scenario where restore completes but job still doesn't exist
- Improve test coverage, prevent future regression
---
### Issue 4: Potential risk of Hazelcast Operation thread blocking
**Location**: `CoordinatorService.java:898-901`
```java
if (runningJobMaster == null) {
restoreAllJobFromMasterNodeSwitchFuture.join(); // Blocking in
Hazelcast Operation thread
runningJobMaster = runningJobMasterMap.get(taskGroupLocation.getJobId());
}
```
**Related context**:
- `NotifyTaskStatusOperation.runInternal()` (lines 71-74): This method
executes in Hazelcast's Operation thread
- `restoreAllRunningJobFromMasterNodeSwitch()` (lines 458-466): Restore
process includes `Thread.sleep(1000)` loop waiting for worker registration
**Issue description**:
The `updateTaskExecutionState()` method is called through
`NotifyTaskStatusOperation`, which executes in Hazelcast's Operation thread.
When executing `restoreAllJobFromMasterNodeSwitchFuture.join()`, it blocks the
Operation thread.
If the restore process takes a long time (e.g., waiting for worker
registration), it will block Hazelcast's Operation thread, potentially
affecting:
1. **Cluster communication**: Hazelcast's partition operations and RPC calls
may be delayed
2. **Heartbeat detection**: If blocked for too long, nodes may be
incorrectly determined as down
3. **Other operations**: Other Operations may be queued waiting
**Potential risks**:
- **Risk 1**: After master switch, if a large number of workers concurrently
report status, multiple Operation threads may be blocked
- **Risk 2**: If restore process includes `Thread.sleep()` (lines 458-466),
blocking time may reach several seconds or longer
- **Risk 3**: Hazelcast's Operation thread pool size is limited (default
value is usually small), long blocking may cause thread pool exhaustion
**Impact scope**:
- **Direct impact**: Hazelcast cluster communication performance
- **Indirect impact**: Other operations depending on Hazelcast RPC
- **Affected area**: Communication stability of the entire cluster
**Severity**: **MAJOR** (depends on Hazelcast thread pool configuration and
restore duration)
**Improvement suggestion**:
There are several possible optimization solutions, but all involve
significant architectural changes:
**Solution A: Async processing (recommended for long-term optimization)**
```java
// In NotifyTaskStatusOperation
@Override
public void runInternal() throws Exception {
SeaTunnelServer server = getService();
// Use async processing to avoid blocking Operation thread
CompletableFuture.runAsync(() -> {
server.getCoordinatorService().updateTaskExecutionState(taskExecutionState);
}, server.getCoordinatorService().getExecutorService());
}
```
**Solution B: Check instead of wait (lighter-weight solution)**
```java
public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
logger.info(
String.format(
"Received task end from execution %s, state %s",
taskExecutionState.getTaskGroupLocation(),
taskExecutionState.getExecutionState()));
TaskGroupLocation taskGroupLocation =
taskExecutionState.getTaskGroupLocation();
JobMaster runningJobMaster =
runningJobMasterMap.get(taskGroupLocation.getJobId());
// Check if restore is still in progress
if (runningJobMaster == null &&
!restoreAllJobFromMasterNodeSwitchFuture.isDone()) {
// Restore still in progress, return early and let worker retry
// This is acceptable because worker already has retry logic
logger.info(String.format(
"Job %s not found and restore still in progress, worker will
retry",
taskGroupLocation.getJobId()));
throw new JobNotFoundException(
String.format("Job %s not running (restore in progress)",
taskGroupLocation.getJobId()));
}
if (runningJobMaster == null) {
throw new JobNotFoundException(
String.format("Job %s not running",
taskGroupLocation.getJobId()));
}
runningJobMaster.updateTaskExecutionState(taskExecutionState);
}
```
**Rationale**:
- **Solution A**: Transfer status updates to `CoordinatorService`'s thread
pool, avoiding blocking Hazelcast Operation threads
- **Solution B**: Don't wait for restore completion, directly throw
`JobNotFoundException`, let worker retry. This leverages existing retry logic
and avoids blocking
**Trade-offs**:
- Solution A requires modifying `NotifyTaskStatusOperation`, with a larger
impact surface
- Solution B may lead to more retries, but won't block Operation threads
- **The current PR chooses the waiting solution**, which is a simple fix,
but requires evaluating whether the Hazelcast thread pool is large enough to
handle potential blocking
---
### Issue 5: Jobs in PendingJobQueue state not considered
**Location**: `CoordinatorService.java:897-906`
```java
JobMaster runningJobMaster =
runningJobMasterMap.get(taskGroupLocation.getJobId());
if (runningJobMaster == null) {
restoreAllJobFromMasterNodeSwitchFuture.join();
runningJobMaster = runningJobMasterMap.get(taskGroupLocation.getJobId());
}
```
**Related context**:
- `getJobMaster()` method (lines 392-398) checks both `pendingJobQueue` and
`runningJobMasterMap`
- `pendingJobSchedule()` method (lines 250-328): Jobs are scheduled from
pending queue to running map
**Issue description**:
When `runningJobMaster` is null, the code only waits for restore completion,
then queries `runningJobMasterMap` again. However, it doesn't check
`pendingJobQueue`.
Although under normal circumstances, the restore process will restore jobs
to `pendingJobQueue` then schedule to `runningJobMasterMap`, theoretically the
following timing exists:
1. Job is restored to `pendingJobQueue`
2. Worker calls `updateTaskExecutionState()`
3. At this time, `runningJobMasterMap` still doesn't have this job (because
scheduling hasn't occurred yet)
4. `.join()` returns (because restore completed)
5. Second query of `runningJobMasterMap` is still null
6. Throw `JobNotFoundException`
**Potential risks**:
- **Risk 1**: Status updates will fail when restore completes but job hasn't
been scheduled from pending queue to running map yet
- **Risk 2**: This time window is very short, but theoretically exists
**Impact scope**:
- **Direct impact**: Accuracy of `updateTaskExecutionState()` method
- **Indirect impact**: Reliability of job status updates
- **Affected area**: All jobs immediately restored after master switch
**Severity**: **MINOR** (because scheduling is usually fast after restore
completes, and worker has retry mechanism)
**Improvement suggestion**:
```java
public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
logger.info(
String.format(
"Received task end from execution %s, state %s",
taskExecutionState.getTaskGroupLocation(),
taskExecutionState.getExecutionState()));
TaskGroupLocation taskGroupLocation =
taskExecutionState.getTaskGroupLocation();
// Try to get job master from running map first
JobMaster runningJobMaster =
runningJobMasterMap.get(taskGroupLocation.getJobId());
// If not found, check if restore is in progress or job is in pending
queue
if (runningJobMaster == null) {
// Check if job is in pending queue
PendingJobInfo pendingJobInfo =
pendingJobQueue.getById(taskGroupLocation.getJobId());
if (pendingJobInfo != null) {
// Job is in pending queue, use its job master
runningJobMaster = pendingJobInfo.getJobMaster();
} else {
// Not in pending queue, wait for restore to complete
restoreAllJobFromMasterNodeSwitchFuture.join();
runningJobMaster =
runningJobMasterMap.get(taskGroupLocation.getJobId());
// Check again after restore
if (runningJobMaster == null) {
pendingJobInfo =
pendingJobQueue.getById(taskGroupLocation.getJobId());
if (pendingJobInfo != null) {
runningJobMaster = pendingJobInfo.getJobMaster();
}
}
}
}
if (runningJobMaster == null) {
throw new JobNotFoundException(
String.format("Job %s not running",
taskGroupLocation.getJobId()));
}
runningJobMaster.updateTaskExecutionState(taskExecutionState);
}
```
**Rationale**:
- Keep consistent with `getJobMaster()` method, check both `pendingJobQueue`
and `runningJobMasterMap`
- Cover scenarios where jobs are in pending queue
- Avoid status update failure when restore completes but scheduling hasn't
completed
**Trade-offs**:
- Increases code complexity
- Need to evaluate whether `JobMaster` obtained from `PendingJobInfo` can
correctly handle `updateTaskExecutionState` calls
- Current `getJobMaster()` returns jobMaster in pendingJobInfo, indicating
this is legitimate
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]