dybyte opened a new pull request, #10506:
URL: https://github.com/apache/seatunnel/pull/10506

   ### Purpose of this pull request
   
   When `CoordinatorService` is restored, 
`CoordinatorService.restoreAllRunningJobFromMasterNodeSwitch()` is executed 
asynchronously.
   If `CoordinatorService.updateTaskExecutionState()` is invoked during this 
process, the `runningJobMasterMap` may not yet be fully restored, which can 
result in a `JobNotFoundException`.
   
   This behavior is particularly critical when a worker node reports a terminal 
state to the master node.
   If the master fails to receive the terminal state, the corresponding job may 
never be marked as completed.
   
   ```
   private void notifyTaskStatusToMaster(
               TaskGroupLocation taskGroupLocation, TaskExecutionState 
taskExecutionState) {
           long sleepTime = 1000;
           boolean notifyStateSuccess = false;
           while (isRunning && !notifyStateSuccess) {
               InvocationFuture<Object> invoke =
                       nodeEngine
                               .getOperationService()
                               .createInvocationBuilder(
                                       SeaTunnelServer.SERVICE_NAME,
                                       new NotifyTaskStatusOperation(
                                               taskGroupLocation, 
taskExecutionState),
                                       nodeEngine.getMasterAddress())
                               .invoke();
               try {
                   invoke.get();
                   notifyStateSuccess = true;
               } catch (InterruptedException e) {
                   logger.severe("send notify task status failed", e);
               } catch (JobNotFoundException e) {
                   logger.warning("send notify task status failed because can't 
find job", e);
                   notifyStateSuccess = true;
   // If updateTaskExecutionState() throws JobNotFoundException, this operation 
is ignored
               } catch (ExecutionException e) {
                   if (e.getCause() instanceof JobNotFoundException) {
                       logger.warning("send notify task status failed because 
can't find job", e);
                       notifyStateSuccess = true;
                   } else {
                       logger.warning(ExceptionUtils.getMessage(e));
                       logger.warning(
                               String.format(
                                       "notify the job of the task(%s) status 
failed, retry in %s millis",
                                       taskGroupLocation, sleepTime));
                       try {
                           Thread.sleep(sleepTime);
                       } catch (InterruptedException ex) {
                           logger.severe(e);
                       }
                   }
               }
           }
       }
   ```
   
   ```
    public void updateTaskExecutionState(TaskExecutionState taskExecutionState) 
{
           logger.info(
                   String.format(
                           "Received task end from execution %s, state %s",
                           taskExecutionState.getTaskGroupLocation(),
                           taskExecutionState.getExecutionState()));
           TaskGroupLocation taskGroupLocation = 
taskExecutionState.getTaskGroupLocation();
           JobMaster runningJobMaster = 
runningJobMasterMap.get(taskGroupLocation.getJobId());
           if (runningJobMaster == null) {
               throw new JobNotFoundException(
                       String.format("Job %s not running", 
taskGroupLocation.getJobId()));
           }
           runningJobMaster.updateTaskExecutionState(taskExecutionState);
       }
   ```
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   - `testUpdateTaskExecutionStateWaitsForRestoreCompletion()`
   - `testUpdateTaskExecutionStateThrowsWhenJobNotFound()`
   
   ### Check list
   
   * [ ] If any new Jar binary package adding in your PR, please add License 
Notice according
     [New License 
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new 
feature. https://github.com/apache/seatunnel/tree/dev/docs
   * [ ] If necessary, please update `incompatible-changes.md` to describe the 
incompatibility caused by this PR.
   * [ ] If you are contributing the connector code, please check that the 
following files are updated:
     1. Update 
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
 and add new connector information in it
     2. Update the pom file of 
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
     3. Add ci label in 
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
     4. Add e2e testcase in 
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
     5. Update connector 
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to