This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 69d40c754f [MINOR] fix(core): Fix the race issue in `LocalJobExecutor` 
(#7968)
69d40c754f is described below

commit 69d40c754f4d457c310b57c9c2e1e9d0d6ca0ef1
Author: Jerry Shao <[email protected]>
AuthorDate: Fri Aug 8 17:38:38 2025 +0800

    [MINOR] fix(core): Fix the race issue in `LocalJobExecutor` (#7968)
    
    ### What changes were proposed in this pull request?
    
    Fix the race issue in `LocalJobExecutor`.
    
    ### Why are the changes needed?
    
    Fix the bug.
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    N/A
---
 .../gravitino/job/local/LocalJobExecutor.java      | 29 ++++++++++++++--------
 1 file changed, 19 insertions(+), 10 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java 
b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
index 5d114a4605..a1c26331ef 100644
--- a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
+++ b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
@@ -156,12 +156,12 @@ public class LocalJobExecutor implements JobExecutor {
     String newJobId = LOCAL_JOB_PREFIX + UUID.randomUUID();
     Pair<String, JobTemplate> jobPair = Pair.of(newJobId, jobTemplate);
 
-    // Add the job template to the waiting queue
-    if (!waitingQueue.offer(jobPair)) {
-      throw new IllegalStateException("Waiting queue is full, cannot submit 
job: " + jobTemplate);
-    }
-
     synchronized (lock) {
+      // Add the job template to the waiting queue
+      if (!waitingQueue.offer(jobPair)) {
+        throw new IllegalStateException("Waiting queue is full, cannot submit 
job: " + jobTemplate);
+      }
+
       jobStatus.put(newJobId, Pair.of(JobHandle.Status.QUEUED, 
UNEXPIRED_TIME_IN_MS));
     }
 
@@ -174,7 +174,7 @@ public class LocalJobExecutor implements JobExecutor {
       if (!jobStatus.containsKey(jobId)) {
         throw new NoSuchJobException("No job found with ID: %s", jobId);
       }
-      LOG.trace(
+      LOG.debug(
           "Get status {} and finished time {} for job {}",
           jobStatus.get(jobId).getLeft(),
           jobStatus.get(jobId).getRight(),
@@ -255,15 +255,24 @@ public class LocalJobExecutor implements JobExecutor {
     try {
       String jobId = jobPair.getLeft();
       JobTemplate jobTemplate = jobPair.getRight();
+
+      Process process;
       synchronized (lock) {
+        // This happens when the job is cancelled before it starts.
+        Pair<JobHandle.Status, Long> statusPair = jobStatus.get(jobId);
+        if (statusPair == null || statusPair.getLeft() != 
JobHandle.Status.QUEUED) {
+          LOG.warn("Job {} is not in QUEUED state, cannot start it", jobId);
+          return;
+        }
+
+        LocalProcessBuilder processBuilder = 
LocalProcessBuilder.create(jobTemplate, configs);
+        process = processBuilder.start();
+        runningProcesses.put(jobId, process);
         jobStatus.put(jobId, Pair.of(JobHandle.Status.STARTED, 
UNEXPIRED_TIME_IN_MS));
       }
 
-      LocalProcessBuilder processBuilder = 
LocalProcessBuilder.create(jobTemplate, configs);
-      Process process = processBuilder.start();
-      runningProcesses.put(jobId, process);
-
       LOG.info("Starting job: {}", jobId);
+
       int exitCode = process.waitFor();
       if (exitCode == 0) {
         LOG.info("Job {} completed successfully", jobId);

Reply via email to