This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 009710bd97 [#7743]feat(core): Add Local job executor and shell 
processor builder (#7814)
009710bd97 is described below

commit 009710bd9737e2e93e025a0111d248c26ed1d00e
Author: Jerry Shao <[email protected]>
AuthorDate: Thu Jul 31 10:15:10 2025 +0800

    [#7743]feat(core): Add Local job executor and shell processor builder 
(#7814)
    
    ### What changes were proposed in this pull request?
    
    Add the local job executor and shell process builder for Gravitino's job
    system.
    
    ### Why are the changes needed?
    
    Fix: #7743
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add UTs to cover the code.
---
 .../apache/gravitino/job/JobExecutorFactory.java   |  20 +-
 .../java/org/apache/gravitino/job/JobManager.java  |   2 +-
 .../gravitino/job/local/LocalJobExecutor.java      | 320 +++++++++++++++++++++
 .../job/local/LocalJobExecutorConfigs.java         |  38 +++
 .../gravitino/job/local/LocalProcessBuilder.java   |  48 ++++
 .../gravitino/job/local/ShellProcessBuilder.java   |  60 ++++
 .../gravitino/job/local/TestLocalJobExecutor.java  | 228 +++++++++++++++
 core/src/test/resources/common.sh                  |  20 ++
 core/src/test/resources/test-job.sh                |  47 +++
 9 files changed, 779 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java 
b/core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java
index cdf06c59af..7399957254 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java
@@ -19,11 +19,14 @@
 package org.apache.gravitino.job;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
 import org.apache.gravitino.connector.job.JobExecutor;
+import org.apache.gravitino.job.local.LocalJobExecutor;
+import org.apache.gravitino.job.local.LocalJobExecutorConfigs;
 
 public class JobExecutorFactory {
 
@@ -31,15 +34,26 @@ public class JobExecutorFactory {
 
   private static final String JOB_EXECUTOR_CLASS_SUFFIX = ".class";
 
+  private static final Map<String, String> BUILTIN_EXECUTORS =
+      ImmutableMap.of(
+          LocalJobExecutorConfigs.LOCAL_JOB_EXECUTOR_NAME,
+          LocalJobExecutor.class.getCanonicalName());
+
   private JobExecutorFactory() {
     // Private constructor to prevent instantiation
   }
 
   public static JobExecutor create(Config config) {
     String jobExecutorName = config.get(Configs.JOB_EXECUTOR);
-    String jobExecutorClassKey =
-        JOB_EXECUTOR_CONF_PREFIX + jobExecutorName + JOB_EXECUTOR_CLASS_SUFFIX;
-    String clzName = config.getRawString(jobExecutorClassKey);
+    String clzName;
+    if (BUILTIN_EXECUTORS.containsKey(jobExecutorName)) {
+      clzName = BUILTIN_EXECUTORS.get(jobExecutorName);
+    } else {
+      String jobExecutorClassKey =
+          JOB_EXECUTOR_CONF_PREFIX + jobExecutorName + 
JOB_EXECUTOR_CLASS_SUFFIX;
+      clzName = config.getRawString(jobExecutorClassKey);
+    }
+
     Preconditions.checkArgument(
         StringUtils.isNotBlank(clzName),
         "Job executor class name must be specified for job executor: %s",
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java 
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index e0307713b9..2b4120ec64 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -384,7 +384,7 @@ public class JobManager implements JobOperationDispatcher, 
Closeable {
   }
 
   @VisibleForTesting
-  static JobTemplate createRuntimeJobTemplate(
+  public static JobTemplate createRuntimeJobTemplate(
       JobTemplateEntity jobTemplateEntity, Map<String, String> jobConf, File 
stagingDir) {
     String name = jobTemplateEntity.name();
     String comment = jobTemplateEntity.comment();
diff --git 
a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java 
b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
new file mode 100644
index 0000000000..8674903f8c
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.job.local;
+
+import static 
org.apache.gravitino.job.local.LocalJobExecutorConfigs.DEFAULT_JOB_STATUS_KEEP_TIME_MS;
+import static 
org.apache.gravitino.job.local.LocalJobExecutorConfigs.DEFAULT_MAX_RUNNING_JOBS;
+import static 
org.apache.gravitino.job.local.LocalJobExecutorConfigs.DEFAULT_WAITING_QUEUE_SIZE;
+import static 
org.apache.gravitino.job.local.LocalJobExecutorConfigs.JOB_STATUS_KEEP_TIME_MS;
+import static 
org.apache.gravitino.job.local.LocalJobExecutorConfigs.MAX_RUNNING_JOBS;
+import static 
org.apache.gravitino.job.local.LocalJobExecutorConfigs.WAITING_QUEUE_SIZE;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gravitino.connector.job.JobExecutor;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LocalJobExecutor implements JobExecutor {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LocalJobExecutor.class);
+
+  private static final String LOCAL_JOB_PREFIX = "local-job-";
+
+  private static final long UNEXPIRED_TIME_IN_MS = -1L;
+
+  private Map<String, String> configs;
+
+  private BlockingQueue<Pair<String, JobTemplate>> waitingQueue;
+
+  private ExecutorService jobExecutorService;
+
+  private ExecutorService jobPollingExecutorService;
+
+  // The job status map to keep track of the status of each job. In the 
meantime, the job status
+  // will be stored in the entity store, so we will clean the finished, 
cancelled and failed jobs
+  // from the map periodically to save the memory.
+  private Map<String, Pair<JobHandle.Status, Long>> jobStatus;
+  private final Object lock = new Object();
+
+  private long jobStatusKeepTimeInMs;
+  private ScheduledExecutorService jobStatusCleanupExecutor;
+
+  private volatile boolean finished = false;
+
+  private Map<String, Process> runningProcesses;
+
+  @Override
+  public void initialize(Map<String, String> configs) {
+    this.configs = configs;
+
+    int waitingQueueSize =
+        configs.containsKey(WAITING_QUEUE_SIZE)
+            ? Integer.parseInt(configs.get(WAITING_QUEUE_SIZE))
+            : DEFAULT_WAITING_QUEUE_SIZE;
+    Preconditions.checkArgument(
+        waitingQueueSize > 0,
+        "Waiting queue size must be greater than 0, but got: %s",
+        waitingQueueSize);
+
+    this.waitingQueue = new LinkedBlockingQueue<>(waitingQueueSize);
+
+    this.jobPollingExecutorService =
+        Executors.newSingleThreadExecutor(
+            runnable -> {
+              Thread thread = new Thread(runnable);
+              thread.setName("LocalJobPollingExecutor-" + thread.getId());
+              thread.setDaemon(true);
+              return thread;
+            });
+    jobPollingExecutorService.submit(this::pollJob);
+
+    int maxRunningJobs =
+        configs.containsKey(MAX_RUNNING_JOBS)
+            ? Integer.parseInt(configs.get(MAX_RUNNING_JOBS))
+            : DEFAULT_MAX_RUNNING_JOBS;
+    Preconditions.checkArgument(
+        maxRunningJobs > 0, "Max running jobs must be greater than 0, but got: 
%s", maxRunningJobs);
+
+    this.jobExecutorService =
+        new ThreadPoolExecutor(
+            0,
+            maxRunningJobs,
+            60L,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(),
+            runnable -> {
+              Thread thread = new Thread(runnable);
+              thread.setName("LocalJobExecutor-" + thread.getId());
+              thread.setDaemon(true);
+              return thread;
+            });
+
+    this.jobStatus = Maps.newHashMap();
+
+    this.jobStatusKeepTimeInMs =
+        configs.containsKey(JOB_STATUS_KEEP_TIME_MS)
+            ? Long.parseLong(configs.get(JOB_STATUS_KEEP_TIME_MS))
+            : DEFAULT_JOB_STATUS_KEEP_TIME_MS;
+    Preconditions.checkArgument(
+        jobStatusKeepTimeInMs > 0,
+        "Job status keep time must be greater than 0, but got: %s",
+        jobStatusKeepTimeInMs);
+
+    long jobStatusCleanupIntervalInMs = jobStatusKeepTimeInMs / 10;
+    this.jobStatusCleanupExecutor =
+        Executors.newSingleThreadScheduledExecutor(
+            runnable -> {
+              Thread thread = new Thread(runnable);
+              thread.setName("LocalJobStatusCleanup-" + thread.getId());
+              thread.setDaemon(true);
+              return thread;
+            });
+    jobStatusCleanupExecutor.scheduleAtFixedRate(
+        this::cleanupJobStatus,
+        jobStatusCleanupIntervalInMs,
+        jobStatusCleanupIntervalInMs,
+        TimeUnit.MILLISECONDS);
+
+    this.runningProcesses = Maps.newConcurrentMap();
+  }
+
+  @Override
+  public String submitJob(JobTemplate jobTemplate) {
+    String newJobId = LOCAL_JOB_PREFIX + UUID.randomUUID();
+    Pair<String, JobTemplate> jobPair = Pair.of(newJobId, jobTemplate);
+
+    // Add the job template to the waiting queue
+    if (!waitingQueue.offer(jobPair)) {
+      throw new IllegalStateException("Waiting queue is full, cannot submit 
job: " + jobTemplate);
+    }
+
+    synchronized (lock) {
+      jobStatus.put(newJobId, Pair.of(JobHandle.Status.QUEUED, 
UNEXPIRED_TIME_IN_MS));
+    }
+
+    return newJobId;
+  }
+
+  @Override
+  public JobHandle.Status getJobStatus(String jobId) throws NoSuchJobException 
{
+    synchronized (lock) {
+      if (!jobStatus.containsKey(jobId)) {
+        throw new NoSuchJobException("No job found with ID: %s", jobId);
+      }
+
+      return jobStatus.get(jobId).getLeft();
+    }
+  }
+
+  @Override
+  public void cancelJob(String jobId) throws NoSuchJobException {
+    synchronized (lock) {
+      if (!jobStatus.containsKey(jobId)) {
+        throw new NoSuchJobException("No job found with ID: %s", jobId);
+      }
+
+      Pair<JobHandle.Status, Long> statusPair = jobStatus.get(jobId);
+      if (statusPair.getLeft() == JobHandle.Status.SUCCEEDED
+          || statusPair.getLeft() == JobHandle.Status.FAILED
+          || statusPair.getLeft() == JobHandle.Status.CANCELLED) {
+        LOG.warn("Job {} is already completed or cancelled, no action taken", 
jobId);
+        return;
+      }
+
+      // If the job is queued.
+      if (statusPair.getLeft() == JobHandle.Status.QUEUED) {
+        waitingQueue.removeIf(p -> p.getLeft().equals(jobId));
+        jobStatus.put(jobId, Pair.of(JobHandle.Status.CANCELLED, 
System.currentTimeMillis()));
+        LOG.info("Job {} is cancelled from the waiting queue", jobId);
+        return;
+      }
+
+      if (statusPair.getLeft() == JobHandle.Status.STARTED) {
+        Process process = runningProcesses.get(jobId);
+        if (process != null) {
+          process.destroy();
+        }
+        LOG.info("Job {} is cancelling while running", jobId);
+        jobStatus.put(jobId, Pair.of(JobHandle.Status.CANCELLING, 
UNEXPIRED_TIME_IN_MS));
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Mark the executor as finished to stop processing jobs
+    this.finished = true;
+    jobPollingExecutorService.shutdownNow();
+
+    if (!waitingQueue.isEmpty()) {
+      LOG.warn(
+          "There are still {} jobs in the waiting queue, they will not be 
processed.",
+          waitingQueue.size());
+      waitingQueue.clear();
+    }
+
+    // Stop the running jobs
+    runningProcesses.forEach(
+        (key, process) -> {
+          if (process != null) {
+            process.destroy();
+          }
+        });
+    runningProcesses.clear();
+
+    jobExecutorService.shutdownNow();
+
+    // Stop the job status cleanup executor
+    jobStatusCleanupExecutor.shutdownNow();
+    jobStatus.clear();
+  }
+
+  public void runJob(Pair<String, JobTemplate> jobPair) {
+    try {
+      String jobId = jobPair.getLeft();
+      JobTemplate jobTemplate = jobPair.getRight();
+      synchronized (lock) {
+        jobStatus.put(jobId, Pair.of(JobHandle.Status.STARTED, 
UNEXPIRED_TIME_IN_MS));
+      }
+
+      LocalProcessBuilder processBuilder = 
LocalProcessBuilder.create(jobTemplate, configs);
+      Process process = processBuilder.start();
+      runningProcesses.put(jobId, process);
+
+      LOG.info("Starting job: {}", jobId);
+      int exitCode = process.waitFor();
+      if (exitCode == 0) {
+        LOG.info("Job {} completed successfully", jobId);
+        synchronized (lock) {
+          jobStatus.put(jobId, Pair.of(JobHandle.Status.SUCCEEDED, 
System.currentTimeMillis()));
+        }
+      } else {
+        synchronized (lock) {
+          JobHandle.Status oldStatus = jobStatus.get(jobId).getLeft();
+          if (oldStatus == JobHandle.Status.CANCELLING) {
+            LOG.info("Job {} was cancelled while running with exit code: {}", 
jobId, exitCode);
+            jobStatus.put(jobId, Pair.of(JobHandle.Status.CANCELLED, 
System.currentTimeMillis()));
+          } else if (oldStatus == JobHandle.Status.STARTED) {
+            LOG.warn("Job {} failed after starting with exit code: {}", jobId, 
exitCode);
+            jobStatus.put(jobId, Pair.of(JobHandle.Status.FAILED, 
System.currentTimeMillis()));
+          }
+        }
+      }
+
+      runningProcesses.remove(jobId);
+
+    } catch (Exception e) {
+      LOG.error("Error while executing job", e);
+      // If an error occurs, we should mark the job as failed
+      synchronized (lock) {
+        String jobId = jobPair.getLeft();
+        jobStatus.put(jobId, Pair.of(JobHandle.Status.FAILED, 
System.currentTimeMillis()));
+      }
+    }
+  }
+
+  public void pollJob() {
+    while (!finished) {
+      try {
+        Pair<String, JobTemplate> jobPair = waitingQueue.poll(3000, 
TimeUnit.MILLISECONDS);
+        if (jobPair == null) {
+          // If no job is available, continue to the next iteration
+          continue;
+        }
+
+        jobExecutorService.submit(() -> runJob(jobPair));
+
+      } catch (InterruptedException e) {
+        LOG.warn("Polling job interrupted", e);
+        finished = true;
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void cleanupJobStatus() {
+    long currentTime = System.currentTimeMillis();
+
+    synchronized (lock) {
+      jobStatus
+          .entrySet()
+          .removeIf(
+              entry ->
+                  entry.getValue().getRight() != UNEXPIRED_TIME_IN_MS
+                      && (currentTime - entry.getValue().getRight()) >= 
jobStatusKeepTimeInMs);
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutorConfigs.java
 
b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutorConfigs.java
new file mode 100644
index 0000000000..627f7b1fa1
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutorConfigs.java
@@ -0,0 +1,38 @@
+package org.apache.gravitino.job.local;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+public class LocalJobExecutorConfigs {
+
+  private LocalJobExecutorConfigs() {
+    // Private constructor to prevent instantiation
+  }
+
+  public static final String LOCAL_JOB_EXECUTOR_NAME = "local";
+
+  public static final String WAITING_QUEUE_SIZE = "waitingQueueSize";
+  public static final int DEFAULT_WAITING_QUEUE_SIZE = 100;
+
+  public static final String MAX_RUNNING_JOBS = "maxRunningJobs";
+  public static final int DEFAULT_MAX_RUNNING_JOBS =
+      Math.max(1, Math.min(Runtime.getRuntime().availableProcessors() / 2, 
10));
+
+  public static final String JOB_STATUS_KEEP_TIME_MS = "jobStatusKeepTimeInMs";
+  public static final long DEFAULT_JOB_STATUS_KEEP_TIME_MS = 60 * 60 * 1000; 
// 1 hour
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/job/local/LocalProcessBuilder.java 
b/core/src/main/java/org/apache/gravitino/job/local/LocalProcessBuilder.java
new file mode 100644
index 0000000000..4307d62cb0
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/job/local/LocalProcessBuilder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job.local;
+
+import java.io.File;
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.ShellJobTemplate;
+
+public abstract class LocalProcessBuilder {
+
+  protected final JobTemplate jobTemplate;
+
+  protected final File workingDirectory;
+
+  protected LocalProcessBuilder(JobTemplate jobTemplate, Map<String, String> 
configs) {
+    this.jobTemplate = jobTemplate;
+    // Executable should be in the working directory, so we can figure out the 
working directory
+    // from the executable path.
+    this.workingDirectory = new File(jobTemplate.executable()).getParentFile();
+  }
+
+  public abstract Process start();
+
+  public static LocalProcessBuilder create(JobTemplate jobTemplate, 
Map<String, String> configs) {
+    if (jobTemplate instanceof ShellJobTemplate) {
+      return new ShellProcessBuilder((ShellJobTemplate) jobTemplate, configs);
+    } else {
+      throw new IllegalArgumentException("Unsupported job template type: " + 
jobTemplate.jobType());
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/job/local/ShellProcessBuilder.java 
b/core/src/main/java/org/apache/gravitino/job/local/ShellProcessBuilder.java
new file mode 100644
index 0000000000..57e3b20c05
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/job/local/ShellProcessBuilder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.job.local;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.job.ShellJobTemplate;
+
+public class ShellProcessBuilder extends LocalProcessBuilder {
+
+  protected ShellProcessBuilder(ShellJobTemplate shellJobTemplate, Map<String, 
String> configs) {
+    super(shellJobTemplate, configs);
+  }
+
+  @Override
+  public Process start() {
+    ShellJobTemplate shellJobTemplate = (ShellJobTemplate) jobTemplate;
+    File executableFile = new File(shellJobTemplate.executable());
+    if (!executableFile.canExecute()) {
+      executableFile.setExecutable(true);
+    }
+
+    List<String> commandList = 
Lists.newArrayList(shellJobTemplate.executable());
+    commandList.addAll(shellJobTemplate.arguments());
+
+    ProcessBuilder builder = new ProcessBuilder(commandList);
+    builder.directory(workingDirectory);
+    builder.environment().putAll(shellJobTemplate.environments());
+
+    File outputFile = new File(workingDirectory, "output.log");
+    File errorFile = new File(workingDirectory, "error.log");
+    builder.redirectOutput(outputFile);
+    builder.redirectError(errorFile);
+
+    try {
+      return builder.start();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start shell process", e);
+    }
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/job/local/TestLocalJobExecutor.java 
b/core/src/test/java/org/apache/gravitino/job/local/TestLocalJobExecutor.java
new file mode 100644
index 0000000000..c88c0811df
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/job/local/TestLocalJobExecutor.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job.local;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.gravitino.connector.job.JobExecutor;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobManager;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+public class TestLocalJobExecutor {
+
+  private static JobExecutor jobExecutor;
+
+  private static JobTemplateEntity jobTemplateEntity;
+
+  private static File workingDir;
+
+  @BeforeAll
+  public static void setUpClass() throws IOException {
+    jobExecutor = new LocalJobExecutor();
+    jobExecutor.initialize(Collections.emptyMap());
+
+    URL testJobScriptUrl = 
TestLocalJobExecutor.class.getResource("/test-job.sh");
+    Assertions.assertNotNull(testJobScriptUrl);
+    File testJobScriptFile = new File(testJobScriptUrl.getFile());
+
+    URL commonScriptUrl = TestLocalJobExecutor.class.getResource("/common.sh");
+    Assertions.assertNotNull(commonScriptUrl);
+    File commonScriptFile = new File(commonScriptUrl.getFile());
+
+    JobTemplateEntity.TemplateContent templateContent =
+        JobTemplateEntity.TemplateContent.builder()
+            .withExecutable(testJobScriptFile.getAbsolutePath())
+            .withArguments(Lists.newArrayList("{{arg1}}", "{{arg2}}"))
+            .withEnvironments(ImmutableMap.of("ENV_VAR", "{{var}}"))
+            .withJobType(JobTemplate.JobType.SHELL)
+            
.withScripts(Lists.newArrayList(commonScriptFile.getAbsolutePath()))
+            .withCustomFields(Collections.emptyMap())
+            .build();
+    jobTemplateEntity =
+        JobTemplateEntity.builder()
+            .withId(1L)
+            .withName("test-job-template")
+            .withNamespace(NamespaceUtil.ofJobTemplate("test"))
+            .withComment("test")
+            .withTemplateContent(templateContent)
+            .withAuditInfo(AuditInfo.EMPTY)
+            .build();
+  }
+
+  @AfterAll
+  public static void tearDownClass() throws IOException {
+    if (jobExecutor != null) {
+      jobExecutor.close();
+      jobExecutor = null;
+    }
+  }
+
+  @BeforeEach
+  public void setUp() throws IOException {
+    workingDir = 
Files.createTempDirectory("gravitino-test-local-job-executor").toFile();
+  }
+
+  @BeforeEach
+  public void tearDown() throws IOException {
+    if (workingDir != null && workingDir.exists()) {
+      FileUtils.deleteDirectory(workingDir);
+    }
+  }
+
+  @Test
+  public void testSubmitJobSuccessfully() throws IOException {
+    Map<String, String> jobConf =
+        ImmutableMap.of(
+            "arg1", "value1",
+            "arg2", "success",
+            "var", "value3");
+
+    JobTemplate template =
+        JobManager.createRuntimeJobTemplate(jobTemplateEntity, jobConf, 
workingDir);
+
+    String jobId = jobExecutor.submitJob(template);
+    Assertions.assertNotNull(jobId);
+
+    Awaitility.await()
+        .atMost(10, TimeUnit.SECONDS)
+        .until(() -> jobExecutor.getJobStatus(jobId) == 
JobHandle.Status.SUCCEEDED);
+
+    String output = FileUtils.readFileToString(new File(workingDir, 
"output.log"), "UTF-8");
+    Assertions.assertTrue(output.contains("value1"));
+    Assertions.assertTrue(output.contains("success"));
+    Assertions.assertTrue(output.contains("value3"));
+    Assertions.assertTrue(output.contains("in common script"));
+
+    Assertions.assertEquals(JobHandle.Status.SUCCEEDED, 
jobExecutor.getJobStatus(jobId));
+  }
+
+  @Test
+  public void testSubmitJobFailure() throws IOException {
+    Map<String, String> jobConf =
+        ImmutableMap.of(
+            "arg1", "value1",
+            "arg2", "fail",
+            "var", "value3");
+
+    JobTemplate template =
+        JobManager.createRuntimeJobTemplate(jobTemplateEntity, jobConf, 
workingDir);
+
+    String jobId = jobExecutor.submitJob(template);
+    Assertions.assertNotNull(jobId);
+
+    Awaitility.await()
+        .atMost(10, TimeUnit.SECONDS)
+        .until(() -> jobExecutor.getJobStatus(jobId) == 
JobHandle.Status.FAILED);
+
+    String output = FileUtils.readFileToString(new File(workingDir, 
"output.log"), "UTF-8");
+    Assertions.assertTrue(output.contains("value1"));
+    Assertions.assertTrue(output.contains("fail"));
+    Assertions.assertTrue(output.contains("value3"));
+    Assertions.assertTrue(output.contains("in common script"));
+
+    Assertions.assertEquals(JobHandle.Status.FAILED, 
jobExecutor.getJobStatus(jobId));
+  }
+
+  @Test
+  public void testCancelJob() throws InterruptedException {
+    Map<String, String> jobConf =
+        ImmutableMap.of(
+            "arg1", "value1",
+            "arg2", "success",
+            "var", "value3");
+
+    JobTemplate template =
+        JobManager.createRuntimeJobTemplate(jobTemplateEntity, jobConf, 
workingDir);
+
+    String jobId = jobExecutor.submitJob(template);
+    Assertions.assertNotNull(jobId);
+    // sleep a while to ensure the job is running.
+    Thread.sleep(100);
+
+    jobExecutor.cancelJob(jobId);
+
+    Awaitility.await()
+        .atMost(10, TimeUnit.SECONDS)
+        .until(() -> jobExecutor.getJobStatus(jobId) == 
JobHandle.Status.CANCELLED);
+
+    Assertions.assertEquals(JobHandle.Status.CANCELLED, 
jobExecutor.getJobStatus(jobId));
+
+    // Cancelling a job that is already cancelled.
+    Assertions.assertDoesNotThrow(() -> jobExecutor.cancelJob(jobId));
+    Assertions.assertEquals(JobHandle.Status.CANCELLED, 
jobExecutor.getJobStatus(jobId));
+  }
+
+  @Test
+  public void testCancelSucceededJob() {
+    // Cancelling a job that is already succeeded.
+    Map<String, String> successJobConf =
+        ImmutableMap.of(
+            "arg1", "value1",
+            "arg2", "success",
+            "var", "value3");
+
+    JobTemplate successTemplate =
+        JobManager.createRuntimeJobTemplate(jobTemplateEntity, successJobConf, 
workingDir);
+    String successJobId = jobExecutor.submitJob(successTemplate);
+    Awaitility.await()
+        .atMost(10, TimeUnit.SECONDS)
+        .until(() -> jobExecutor.getJobStatus(successJobId) == 
JobHandle.Status.SUCCEEDED);
+
+    Assertions.assertDoesNotThrow(() -> jobExecutor.cancelJob(successJobId));
+    Assertions.assertEquals(JobHandle.Status.SUCCEEDED, 
jobExecutor.getJobStatus(successJobId));
+  }
+
+  @Test
+  public void testCancelFailedJob() {
+    // Cancelling a job that is already failed.
+    Map<String, String> failJobConf =
+        ImmutableMap.of(
+            "arg1", "value1",
+            "arg2", "fail",
+            "var", "value3");
+
+    JobTemplate failTemplate =
+        JobManager.createRuntimeJobTemplate(jobTemplateEntity, failJobConf, 
workingDir);
+    String failJobId = jobExecutor.submitJob(failTemplate);
+    Awaitility.await()
+        .atMost(10, TimeUnit.SECONDS)
+        .until(() -> jobExecutor.getJobStatus(failJobId) == 
JobHandle.Status.FAILED);
+
+    Assertions.assertDoesNotThrow(() -> jobExecutor.cancelJob(failJobId));
+    Assertions.assertEquals(JobHandle.Status.FAILED, 
jobExecutor.getJobStatus(failJobId));
+  }
+}
diff --git a/core/src/test/resources/common.sh 
b/core/src/test/resources/common.sh
new file mode 100755
index 0000000000..4a29c32441
--- /dev/null
+++ b/core/src/test/resources/common.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+echo "in common script"
diff --git a/core/src/test/resources/test-job.sh 
b/core/src/test/resources/test-job.sh
new file mode 100755
index 0000000000..a7725e3b7b
--- /dev/null
+++ b/core/src/test/resources/test-job.sh
@@ -0,0 +1,47 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This script is used to test the job submission and execution in a local 
environment.
+echo "starting test test job"
+
+bin="$(dirname "${BASH_SOURCE-$0}")"
+bin="$(cd "${bin}">/dev/null; pwd)"
+
+. "${bin}/common.sh"
+
+sleep 3
+
+JOB_NAME="test_job-$(date +%s)-$1"
+
+echo "Submitting job with name: $JOB_NAME"
+
+echo "$1"
+
+echo "$2"
+
+echo "$ENV_VAR"
+
+if [[ "$2" == "success" ]]; then
+  exit 0
+elif [[ "$2" == "fail" ]]; then
+  exit 1
+else
+  exit 2
+fi

Reply via email to