This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 69d40c754f [MINOR] fix(core): Fix the race issue in `LocalJobExecutor`
(#7968)
69d40c754f is described below
commit 69d40c754f4d457c310b57c9c2e1e9d0d6ca0ef1
Author: Jerry Shao <[email protected]>
AuthorDate: Fri Aug 8 17:38:38 2025 +0800
[MINOR] fix(core): Fix the race issue in `LocalJobExecutor` (#7968)
### What changes were proposed in this pull request?
Fix the race issue in `LocalJobExecutor`.
### Why are the changes needed?
Fix the bug.
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
N/A
---
.../gravitino/job/local/LocalJobExecutor.java | 29 ++++++++++++++--------
1 file changed, 19 insertions(+), 10 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
index 5d114a4605..a1c26331ef 100644
--- a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
+++ b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
@@ -156,12 +156,12 @@ public class LocalJobExecutor implements JobExecutor {
String newJobId = LOCAL_JOB_PREFIX + UUID.randomUUID();
Pair<String, JobTemplate> jobPair = Pair.of(newJobId, jobTemplate);
- // Add the job template to the waiting queue
- if (!waitingQueue.offer(jobPair)) {
- throw new IllegalStateException("Waiting queue is full, cannot submit
job: " + jobTemplate);
- }
-
synchronized (lock) {
+ // Add the job template to the waiting queue
+ if (!waitingQueue.offer(jobPair)) {
+ throw new IllegalStateException("Waiting queue is full, cannot submit
job: " + jobTemplate);
+ }
+
jobStatus.put(newJobId, Pair.of(JobHandle.Status.QUEUED,
UNEXPIRED_TIME_IN_MS));
}
@@ -174,7 +174,7 @@ public class LocalJobExecutor implements JobExecutor {
if (!jobStatus.containsKey(jobId)) {
throw new NoSuchJobException("No job found with ID: %s", jobId);
}
- LOG.trace(
+ LOG.debug(
"Get status {} and finished time {} for job {}",
jobStatus.get(jobId).getLeft(),
jobStatus.get(jobId).getRight(),
@@ -255,15 +255,24 @@ public class LocalJobExecutor implements JobExecutor {
try {
String jobId = jobPair.getLeft();
JobTemplate jobTemplate = jobPair.getRight();
+
+ Process process;
synchronized (lock) {
+ // This happens when the job is cancelled before it starts.
+ Pair<JobHandle.Status, Long> statusPair = jobStatus.get(jobId);
+ if (statusPair == null || statusPair.getLeft() !=
JobHandle.Status.QUEUED) {
+ LOG.warn("Job {} is not in QUEUED state, cannot start it", jobId);
+ return;
+ }
+
+ LocalProcessBuilder processBuilder =
LocalProcessBuilder.create(jobTemplate, configs);
+ process = processBuilder.start();
+ runningProcesses.put(jobId, process);
jobStatus.put(jobId, Pair.of(JobHandle.Status.STARTED,
UNEXPIRED_TIME_IN_MS));
}
- LocalProcessBuilder processBuilder =
LocalProcessBuilder.create(jobTemplate, configs);
- Process process = processBuilder.start();
- runningProcesses.put(jobId, process);
-
LOG.info("Starting job: {}", jobId);
+
int exitCode = process.waitFor();
if (exitCode == 0) {
LOG.info("Job {} completed successfully", jobId);