DanielCarter-stack commented on PR #10506:
URL: https://github.com/apache/seatunnel/pull/10506#issuecomment-3941608493

   <!-- code-pr-reviewer -->
   <!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10506", "part": 1, 
"total": 1} -->
   ### Issue 1: Improper exception handling when Restore fails
   
   **Location**: `CoordinatorService.java:898-901`
   
   ```java
   if (runningJobMaster == null) {
       restoreAllJobFromMasterNodeSwitchFuture.join();  // May throw 
CompletionException
       runningJobMaster = runningJobMasterMap.get(taskGroupLocation.getJobId());
   }
   ```
   
   **Related context**:
   - `restoreAllRunningJobFromMasterNodeSwitch()` (lines 449-504): Throws 
`SeaTunnelEngineException` on exception
   - `TaskExecutionService.notifyTaskStatusToMaster()` (lines 442-480): Expects 
to catch `JobNotFoundException`
   
   **Issue description**:
   If `restoreAllJobFromMasterNodeSwitchFuture` completes exceptionally due to 
restore failure, `.join()` will throw `CompletionException` (wrapping 
`SeaTunnelEngineException`), instead of the expected `JobNotFoundException`. 
This leads to:
   
   1. Worker-side retry logic becomes ineffective: `notifyTaskStatusToMaster()` 
only catches `JobNotFoundException`, for `CompletionException` it will enter 
the generic exception branch and continue retrying
   2. Log pollution: A large number of retries will generate warning logs
   3. Semantic confusion: restore failure and job not found are two different 
errors and should be handled separately
   
   **Potential risks**:
   - **Risk 1**: After restore failure, all task status updates will fail and 
continuously retry, affecting system stability
   - **Risk 2**: Error logs will mislead operations personnel, as it looks like 
"job not found" but is actually "restore failed"
   
   **Impact scope**:
   - **Direct impact**: `CoordinatorService.updateTaskExecutionState()` method
   - **Indirect impact**: `TaskExecutionService.notifyTaskStatusToMaster()` 
retry logic
   - **Affected area**: Zeta core framework, all jobs in scenarios where master 
switches and restore fails
   
   **Severity**: **MAJOR**
   
   **Improvement suggestion**:
   ```java
   // CoordinatorService.java:890-907
   public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
       logger.info(
               String.format(
                       "Received task end from execution %s, state %s",
                       taskExecutionState.getTaskGroupLocation(),
                       taskExecutionState.getExecutionState()));
       TaskGroupLocation taskGroupLocation = 
taskExecutionState.getTaskGroupLocation();
       JobMaster runningJobMaster = 
runningJobMasterMap.get(taskGroupLocation.getJobId());
       if (runningJobMaster == null) {
           try {
               // Wait for job restoration to complete during master node switch
               restoreAllJobFromMasterNodeSwitchFuture.join();
           } catch (Exception e) {
               // If restore failed, treat it as job not found and log the error
               logger.warning(
                   String.format(
                       "Failed to wait for restore completion for job %s: %s",
                       taskGroupLocation.getJobId(),
                       ExceptionUtils.getMessage(e)));
               throw new JobNotFoundException(
                   String.format("Job %s not running (restore failed)",
                       taskGroupLocation.getJobId()), e);
           }
           runningJobMaster = 
runningJobMasterMap.get(taskGroupLocation.getJobId());
       }
       if (runningJobMaster == null) {
           throw new JobNotFoundException(
                   String.format("Job %s not running", 
taskGroupLocation.getJobId()));
       }
       runningJobMaster.updateTaskExecutionState(taskExecutionState);
   }
   ```
   
   **Rationale**:
   - Catch all exceptions that `.join()` may throw, convert to 
`JobNotFoundException`
   - Keep worker-side retry logic working normally
   - Clearly distinguish between "restore failed" and "job not found" in logs
   
   ---
   
   ### Issue 2: Missing critical logs and comments
   
   **Location**: `CoordinatorService.java:898-901`
   
   ```java
   if (runningJobMaster == null) {
       restoreAllJobFromMasterNodeSwitchFuture.join();
       runningJobMaster = runningJobMasterMap.get(taskGroupLocation.getJobId());
   }
   ```
   
   **Related context**:
   - Method JavaDoc: None
   - Caller: `NotifyTaskStatusOperation.runInternal()` (lines 71-74)
   
   **Issue description**:
   The newly added waiting logic has no comments explaining it, and no logs are 
recorded when triggered. This leads to:
   
   1. **Poor maintainability**: Future code reviewers or maintainers won't 
understand why we need to join future here
   2. **Weak diagnosability**: When status updates are delayed due to master 
switch, it's impossible to see from logs that we're waiting for restore
   3. **Difficult debugging**: When problems occur in production, it's 
impossible to determine whether this waiting branch was entered
   
   **Potential risks**:
   - **Risk 1**: Future developers may mistakenly think this is an unnecessary 
block and remove it
   - **Risk 2**: During performance tuning, unable to accurately assess the 
impact of master switch on status updates
   
   **Impact scope**:
   - **Direct impact**: Code maintainability and observability
   - **Indirect impact**: Problem diagnosis efficiency
   - **Affected area**: Only affects 
`CoordinatorService.updateTaskExecutionState()` method
   
   **Severity**: **MINOR**
   
   **Improvement suggestion**:
   ```java
   public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
       logger.info(
               String.format(
                       "Received task end from execution %s, state %s",
                       taskExecutionState.getTaskGroupLocation(),
                       taskExecutionState.getExecutionState()));
       TaskGroupLocation taskGroupLocation = 
taskExecutionState.getTaskGroupLocation();
       JobMaster runningJobMaster = 
runningJobMasterMap.get(taskGroupLocation.getJobId());
       
       // During master node switch, jobs are being restored asynchronously.
       // If a job is not found in the runningJobMasterMap, wait for the 
restoration
       // to complete before throwing JobNotFoundException. This prevents false 
negatives
       // when workers report task status during the restore window.
       if (runningJobMaster == null) {
           logger.info(String.format(
               "Job %s not found in runningJobMasterMap, waiting for restore 
completion",
               taskGroupLocation.getJobId()));
           restoreAllJobFromMasterNodeSwitchFuture.join();
           runningJobMaster = 
runningJobMasterMap.get(taskGroupLocation.getJobId());
           logger.info(String.format(
               "Restore completed, job %s %s",
               taskGroupLocation.getJobId(),
               runningJobMaster != null ? "found" : "still not found"));
       }
       
       if (runningJobMaster == null) {
           throw new JobNotFoundException(
                   String.format("Job %s not running", 
taskGroupLocation.getJobId()));
       }
       runningJobMaster.updateTaskExecutionState(taskExecutionState);
   }
   ```
   
   **Rationale**:
   - Clear comments explain why waiting is necessary
   - Logs can help diagnose status update behavior during master switch
   - Facilitates performance analysis and troubleshooting
   
   ---
   
   ### Issue 3: Insufficient test coverage: Restore failure scenarios
   
   **Location**: `CoordinatorServiceTest.java:472-562`
   
   **Related context**:
   - `testUpdateTaskExecutionStateWaitsForRestoreCompletion()` (lines 472-542)
   - `testUpdateTaskExecutionStateThrowsWhenJobNotFound()` (lines 545-562)
   
   **Issue description**:
   Existing tests only cover restore success scenarios, but not restore failure 
scenarios. Specifically:
   
   1. **Missing exceptional completion test**: No test for behavior when 
`restoreAllJobFromMasterNodeSwitchFuture` completes exceptionally
   2. **Edge cases**: No test for scenario where restore completes but job is 
still not found (e.g., job was cancelled during restore)
   3. **Concurrency scenarios**: No test for behavior when multiple threads 
call `updateTaskExecutionState()` simultaneously
   
   **Potential risks**:
   - **Risk 1**: The exception handling issue described in Issue 1 may not be 
discovered in tests and go directly to production
   - **Risk 2**: Future code changes may break this fix, and tests won't detect 
it
   
   **Impact scope**:
   - **Direct impact**: Test coverage
   - **Indirect impact**: Code quality assurance
   - **Affected area**: `CoordinatorService.updateTaskExecutionState()` method
   
   **Severity**: **MINOR**
   
   **Improvement suggestion**:
   ```java
   @Test
   void testUpdateTaskExecutionStateWhenRestoreFailed() throws Exception {
       JobInformation jobInformation =
               submitJob(
                       
"CoordinatorServiceTest_testUpdateTaskExecutionStateWhenRestoreFailed",
                       "batch_fake_to_console.conf",
                       "test_update_task_execution_state_when_restore_failed");
       CoordinatorService coordinatorService = 
jobInformation.coordinatorService;
       await().atMost(30000, TimeUnit.MILLISECONDS)
               .untilAsserted(
                       () ->
                               Assertions.assertEquals(
                                       JobStatus.RUNNING,
                                       
coordinatorService.getJobStatus(jobInformation.jobId)));
       
       // Remove job from runningJobMasterMap to simulate master switch
       Field mapField = 
CoordinatorService.class.getDeclaredField("runningJobMasterMap");
       mapField.setAccessible(true);
       @SuppressWarnings("unchecked")
       Map<Long, JobMaster> runningJobMasterMap =
               (Map<Long, JobMaster>) mapField.get(coordinatorService);
       runningJobMasterMap.remove(jobInformation.jobId);
       
       // Create a failed future to simulate restore failure
       CompletableFuture<Void> failedFuture = new CompletableFuture<>();
       failedFuture.completeExceptionally(new SeaTunnelEngineException("Restore 
failed"));
       PassiveCompletableFuture<Void> failedPassiveFuture =
               new PassiveCompletableFuture<>(failedFuture);
       
       Field futureField =
               CoordinatorService.class.getDeclaredField(
                       "restoreAllJobFromMasterNodeSwitchFuture");
       futureField.setAccessible(true);
       futureField.set(coordinatorService, failedPassiveFuture);
       
       // Try to update task state
       TaskGroupLocation fakeLocation = new 
TaskGroupLocation(jobInformation.jobId, 0, 0);
       TaskExecutionState taskExecutionState =
               new TaskExecutionState(fakeLocation, ExecutionState.FAILED);
       
       // Should throw JobNotFoundException even when restore failed
       Assertions.assertThrows(
               JobNotFoundException.class,
               () -> 
coordinatorService.updateTaskExecutionState(taskExecutionState));
       
       jobInformation.coordinatorServiceTest.shutdown();
   }
   
   @Test
   void testUpdateTaskExecutionStateWhenJobRemovedDuringRestore() throws 
Exception {
       JobInformation jobInformation =
               submitJob(
                       
"CoordinatorServiceTest_testUpdateTaskExecutionStateWhenJobRemovedDuringRestore",
                       "batch_fake_to_console.conf",
                       
"test_update_task_execution_state_job_removed_during_restore");
       CoordinatorService coordinatorService = 
jobInformation.coordinatorService;
       await().atMost(30000, TimeUnit.MILLISECONDS)
               .untilAsserted(
                       () ->
                               Assertions.assertEquals(
                                       JobStatus.RUNNING,
                                       
coordinatorService.getJobStatus(jobInformation.jobId)));
       
       Field mapField = 
CoordinatorService.class.getDeclaredField("runningJobMasterMap");
       mapField.setAccessible(true);
       @SuppressWarnings("unchecked")
       Map<Long, JobMaster> runningJobMasterMap =
               (Map<Long, JobMaster>) mapField.get(coordinatorService);
       
       // Remove job from map but don't add it back (simulating job 
cancellation during restore)
       Long jobId = jobInformation.jobId;
       runningJobMasterMap.remove(jobId);
       
       // Create a future that completes but doesn't restore the job
       CompletableFuture<Void> completedFuture = 
CompletableFuture.completedFuture(null);
       PassiveCompletableFuture<Void> completedPassiveFuture =
               new PassiveCompletableFuture<>(completedFuture);
       
       Field futureField =
               CoordinatorService.class.getDeclaredField(
                       "restoreAllJobFromMasterNodeSwitchFuture");
       futureField.setAccessible(true);
       futureField.set(coordinatorService, completedPassiveFuture);
       
       // Try to update task state
       TaskGroupLocation fakeLocation = new TaskGroupLocation(jobId, 0, 0);
       TaskExecutionState taskExecutionState =
               new TaskExecutionState(fakeLocation, ExecutionState.FAILED);
       
       // Should throw JobNotFoundException
       Assertions.assertThrows(
               JobNotFoundException.class,
               () -> 
coordinatorService.updateTaskExecutionState(taskExecutionState));
       
       jobInformation.coordinatorServiceTest.shutdown();
   }
   ```
   
   **Rationale**:
   - Cover restore failure scenarios, verify exception handling correctness
   - Cover scenario where restore completes but job still doesn't exist
   - Improve test coverage, prevent future regression
   
   ---
   
   ### Issue 4: Potential risk of Hazelcast Operation thread blocking
   
   **Location**: `CoordinatorService.java:898-901`
   
   ```java
   if (runningJobMaster == null) {
       restoreAllJobFromMasterNodeSwitchFuture.join();  // Blocking in 
Hazelcast Operation thread
       runningJobMaster = runningJobMasterMap.get(taskGroupLocation.getJobId());
   }
   ```
   
   **Related context**:
   - `NotifyTaskStatusOperation.runInternal()` (lines 71-74): This method 
executes in Hazelcast's Operation thread
   - `restoreAllRunningJobFromMasterNodeSwitch()` (lines 458-466): Restore 
process includes `Thread.sleep(1000)` loop waiting for worker registration
   
   **Issue description**:
   The `updateTaskExecutionState()` method is called through 
`NotifyTaskStatusOperation`, which executes in Hazelcast's Operation thread. 
When executing `restoreAllJobFromMasterNodeSwitchFuture.join()`, it blocks the 
Operation thread.
   
   If the restore process takes a long time (e.g., waiting for worker 
registration), it will block Hazelcast's Operation thread, potentially 
affecting:
   
   1. **Cluster communication**: Hazelcast's partition operations and RPC calls 
may be delayed
   2. **Heartbeat detection**: If blocked for too long, nodes may be 
incorrectly determined as down
   3. **Other operations**: Other Operations may be queued waiting
   
   **Potential risks**:
   - **Risk 1**: After master switch, if a large number of workers concurrently 
report status, multiple Operation threads may be blocked
   - **Risk 2**: If restore process includes `Thread.sleep()` (lines 458-466), 
blocking time may reach several seconds or longer
   - **Risk 3**: Hazelcast's Operation thread pool size is limited (default 
value is usually small), long blocking may cause thread pool exhaustion
   
   **Impact scope**:
   - **Direct impact**: Hazelcast cluster communication performance
   - **Indirect impact**: Other operations depending on Hazelcast RPC
   - **Affected area**: Communication stability of the entire cluster
   
   **Severity**: **MAJOR** (depends on Hazelcast thread pool configuration and 
restore duration)
   
   **Improvement suggestion**:
   There are several possible optimization solutions, but all involve 
significant architectural changes:
   
   **Solution A: Async processing (recommended for long-term optimization)**
   ```java
   // In NotifyTaskStatusOperation
   @Override
   public void runInternal() throws Exception {
       SeaTunnelServer server = getService();
       // Use async processing to avoid blocking Operation thread
       CompletableFuture.runAsync(() -> {
           
server.getCoordinatorService().updateTaskExecutionState(taskExecutionState);
       }, server.getCoordinatorService().getExecutorService());
   }
   ```
   
   **Solution B: Check instead of wait (lighter-weight solution)**
   ```java
   public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
       logger.info(
               String.format(
                       "Received task end from execution %s, state %s",
                       taskExecutionState.getTaskGroupLocation(),
                       taskExecutionState.getExecutionState()));
       TaskGroupLocation taskGroupLocation = 
taskExecutionState.getTaskGroupLocation();
       JobMaster runningJobMaster = 
runningJobMasterMap.get(taskGroupLocation.getJobId());
       
       // Check if restore is still in progress
       if (runningJobMaster == null && 
           !restoreAllJobFromMasterNodeSwitchFuture.isDone()) {
           // Restore still in progress, return early and let worker retry
           // This is acceptable because worker already has retry logic
           logger.info(String.format(
               "Job %s not found and restore still in progress, worker will 
retry",
               taskGroupLocation.getJobId()));
           throw new JobNotFoundException(
               String.format("Job %s not running (restore in progress)",
                   taskGroupLocation.getJobId()));
       }
       
       if (runningJobMaster == null) {
           throw new JobNotFoundException(
                   String.format("Job %s not running", 
taskGroupLocation.getJobId()));
       }
       runningJobMaster.updateTaskExecutionState(taskExecutionState);
   }
   ```
   
   **Rationale**:
   - **Solution A**: Transfer status updates to `CoordinatorService`'s thread 
pool, avoiding blocking Hazelcast Operation threads
   - **Solution B**: Don't wait for restore completion, directly throw 
`JobNotFoundException`, let worker retry. This leverages existing retry logic 
and avoids blocking
   
   **Trade-offs**:
   - Solution A requires modifying `NotifyTaskStatusOperation`, with a larger 
impact surface
   - Solution B may lead to more retries, but won't block Operation threads
   - **The current PR chooses the waiting solution**, which is a simple fix, 
but requires evaluating whether the Hazelcast thread pool is large enough to 
handle potential blocking
   
   ---
   
   ### Issue 5: Jobs in PendingJobQueue state not considered
   
   **Location**: `CoordinatorService.java:897-906`
   
   ```java
   JobMaster runningJobMaster = 
runningJobMasterMap.get(taskGroupLocation.getJobId());
   if (runningJobMaster == null) {
       restoreAllJobFromMasterNodeSwitchFuture.join();
       runningJobMaster = runningJobMasterMap.get(taskGroupLocation.getJobId());
   }
   ```
   
   **Related context**:
   - `getJobMaster()` method (lines 392-398) checks both `pendingJobQueue` and 
`runningJobMasterMap`
   - `pendingJobSchedule()` method (lines 250-328): Jobs are scheduled from 
pending queue to running map
   
   **Issue description**:
   When `runningJobMaster` is null, the code only waits for restore completion, 
then queries `runningJobMasterMap` again. However, it doesn't check 
`pendingJobQueue`.
   
   Although under normal circumstances, the restore process will restore jobs 
to `pendingJobQueue` then schedule to `runningJobMasterMap`, theoretically the 
following timing exists:
   
   1. Job is restored to `pendingJobQueue`
   2. Worker calls `updateTaskExecutionState()`
   3. At this time, `runningJobMasterMap` still doesn't have this job (because 
scheduling hasn't occurred yet)
   4. `.join()` returns (because restore completed)
   5. Second query of `runningJobMasterMap` is still null
   6. Throw `JobNotFoundException`
   
   **Potential risks**:
   - **Risk 1**: Status updates will fail when restore completes but job hasn't 
been scheduled from pending queue to running map yet
   - **Risk 2**: This time window is very short, but theoretically exists
   
   **Impact scope**:
   - **Direct impact**: Accuracy of `updateTaskExecutionState()` method
   - **Indirect impact**: Reliability of job status updates
   - **Affected area**: All jobs immediately restored after master switch
   
   **Severity**: **MINOR** (because scheduling is usually fast after restore 
completes, and worker has retry mechanism)
   
   **Improvement suggestion**:
   ```java
   public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
       logger.info(
               String.format(
                       "Received task end from execution %s, state %s",
                       taskExecutionState.getTaskGroupLocation(),
                       taskExecutionState.getExecutionState()));
       TaskGroupLocation taskGroupLocation = 
taskExecutionState.getTaskGroupLocation();
       
       // Try to get job master from running map first
       JobMaster runningJobMaster = 
runningJobMasterMap.get(taskGroupLocation.getJobId());
       
       // If not found, check if restore is in progress or job is in pending 
queue
       if (runningJobMaster == null) {
           // Check if job is in pending queue
           PendingJobInfo pendingJobInfo = 
pendingJobQueue.getById(taskGroupLocation.getJobId());
           if (pendingJobInfo != null) {
               // Job is in pending queue, use its job master
               runningJobMaster = pendingJobInfo.getJobMaster();
           } else {
               // Not in pending queue, wait for restore to complete
               restoreAllJobFromMasterNodeSwitchFuture.join();
               runningJobMaster = 
runningJobMasterMap.get(taskGroupLocation.getJobId());
               
               // Check again after restore
               if (runningJobMaster == null) {
                   pendingJobInfo = 
pendingJobQueue.getById(taskGroupLocation.getJobId());
                   if (pendingJobInfo != null) {
                       runningJobMaster = pendingJobInfo.getJobMaster();
                   }
               }
           }
       }
       
       if (runningJobMaster == null) {
           throw new JobNotFoundException(
                   String.format("Job %s not running", 
taskGroupLocation.getJobId()));
       }
       runningJobMaster.updateTaskExecutionState(taskExecutionState);
   }
   ```
   
   **Rationale**:
   - Keep consistent with `getJobMaster()` method, check both `pendingJobQueue` 
and `runningJobMasterMap`
   - Cover scenarios where jobs are in pending queue
   - Avoid status update failure when restore completes but scheduling hasn't 
completed
   
   **Trade-offs**:
   - Increases code complexity
   - Need to evaluate whether `JobMaster` obtained from `PendingJobInfo` can 
correctly handle `updateTaskExecutionState` calls
   - Current `getJobMaster()` returns jobMaster in pendingJobInfo, indicating 
this is legitimate
   
   ---


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to