Hisoka-X commented on code in PR #9749:
URL: https://github.com/apache/seatunnel/pull/9749#discussion_r2342778297
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -317,21 +311,22 @@ private void pendingJobSchedule() throws
InterruptedException {
"The %s %s is in %s state, restore
pipeline and take over this job running",
pendingSourceState, jobFullName,
jobStatus));
- pendingJobMasterMap.remove(jobId);
- runningJobMasterMap.put(jobId, jobMaster);
- jobMaster.run();
+ runningJobMasterMap.put(jobId, finalJobMaster);
+ finalJobMaster.run();
} finally {
- if (jobMasterCompletedSuccessfully(jobMaster,
pendingSourceState)) {
+ if (jobMasterCompletedSuccessfully(finalJobMaster,
pendingSourceState)) {
runningJobMasterMap.remove(jobId);
}
}
});
}
private void queueRemove(JobMaster jobMaster) throws InterruptedException {
- JobMaster take = pendingJob.take();
- if (take != jobMaster) {
+ PendingJobInfo take = pendingJobQueue.peekBlocking();
+ if (take.getJobMaster() != jobMaster) {
logger.severe("The job master is not equal to the peek job
master");
+ } else {
+ pendingJobQueue.removeById(jobMaster.getJobId());
Review Comment:
Why use `peekBlocking`? It's does not remove item from queue. So when
`take.getJobMaster() != jobMaster` is true. No jobmaster will be removed?
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -569,11 +566,13 @@ public synchronized void clearCoordinatorService() {
// interrupt all JobMaster
runningJobMasterMap.values().forEach(JobMaster::interrupt);
if (isWaitStrategy) {
- pendingJobMasterMap.values().stream()
- .filter(Objects::nonNull)
- .map(Tuple2::_2)
- .forEach(JobMaster::interrupt);
- pendingJobMasterMap.clear();
+ pendingJobQueue
+ .getJobIdMap()
+ .forEach(
+ (jobId, pendingJobInfo) -> {
+ pendingJobInfo.getJobMaster().interrupt();
+ pendingJobQueue.removeById(jobId);
+ });
Review Comment:
invoke `removeById` in `forEach`? I'm not sure it's ok in this case.
How about do clear after forEach, just like before.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]