dybyte opened a new pull request, #10506:
URL: https://github.com/apache/seatunnel/pull/10506
### Purpose of this pull request
When `CoordinatorService` is restored,
`CoordinatorService.restoreAllRunningJobFromMasterNodeSwitch()` is executed
asynchronously.
If `CoordinatorService.updateTaskExecutionState()` is invoked during this
process, the `runningJobMasterMap` may not yet be fully restored, which can
result in a `JobNotFoundException`.
This behavior is particularly critical when a worker node reports a terminal
state to the master node.
If the master fails to receive the terminal state, the corresponding job may
never be marked as completed.
```
private void notifyTaskStatusToMaster(
TaskGroupLocation taskGroupLocation, TaskExecutionState
taskExecutionState) {
long sleepTime = 1000;
boolean notifyStateSuccess = false;
while (isRunning && !notifyStateSuccess) {
InvocationFuture<Object> invoke =
nodeEngine
.getOperationService()
.createInvocationBuilder(
SeaTunnelServer.SERVICE_NAME,
new NotifyTaskStatusOperation(
taskGroupLocation,
taskExecutionState),
nodeEngine.getMasterAddress())
.invoke();
try {
invoke.get();
notifyStateSuccess = true;
} catch (InterruptedException e) {
logger.severe("send notify task status failed", e);
} catch (JobNotFoundException e) {
logger.warning("send notify task status failed because can't
find job", e);
notifyStateSuccess = true;
// If updateTaskExecutionState() throws JobNotFoundException, this operation
is ignored
} catch (ExecutionException e) {
if (e.getCause() instanceof JobNotFoundException) {
logger.warning("send notify task status failed because
can't find job", e);
notifyStateSuccess = true;
} else {
logger.warning(ExceptionUtils.getMessage(e));
logger.warning(
String.format(
"notify the job of the task(%s) status
failed, retry in %s millis",
taskGroupLocation, sleepTime));
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ex) {
logger.severe(e);
}
}
}
}
}
```
```
public void updateTaskExecutionState(TaskExecutionState taskExecutionState)
{
logger.info(
String.format(
"Received task end from execution %s, state %s",
taskExecutionState.getTaskGroupLocation(),
taskExecutionState.getExecutionState()));
TaskGroupLocation taskGroupLocation =
taskExecutionState.getTaskGroupLocation();
JobMaster runningJobMaster =
runningJobMasterMap.get(taskGroupLocation.getJobId());
if (runningJobMaster == null) {
throw new JobNotFoundException(
String.format("Job %s not running",
taskGroupLocation.getJobId()));
}
runningJobMaster.updateTaskExecutionState(taskExecutionState);
}
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `testUpdateTaskExecutionStateWaitsForRestoreCompletion()`
- `testUpdateTaskExecutionStateThrowsWhenJobNotFound()`
### Check list
* [ ] If any new Jar binary package adding in your PR, please add License
Notice according
[New License
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
* [ ] If necessary, please update the documentation to describe the new
feature. https://github.com/apache/seatunnel/tree/dev/docs
* [ ] If necessary, please update `incompatible-changes.md` to describe the
incompatibility caused by this PR.
* [ ] If you are contributing the connector code, please check that the
following files are updated:
1. Update
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
and add new connector information in it
2. Update the pom file of
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
3. Add ci label in
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
4. Add e2e testcase in
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
5. Update connector
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]