This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 009710bd97 [#7743]feat(core): Add Local job executor and shell
processor builder (#7814)
009710bd97 is described below
commit 009710bd9737e2e93e025a0111d248c26ed1d00e
Author: Jerry Shao <[email protected]>
AuthorDate: Thu Jul 31 10:15:10 2025 +0800
[#7743]feat(core): Add Local job executor and shell processor builder
(#7814)
### What changes were proposed in this pull request?
Add the local job executor and shell process builder for Gravitino's job
system.
### Why are the changes needed?
Fix: #7743
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add UTs to cover the code.
---
.../apache/gravitino/job/JobExecutorFactory.java | 20 +-
.../java/org/apache/gravitino/job/JobManager.java | 2 +-
.../gravitino/job/local/LocalJobExecutor.java | 320 +++++++++++++++++++++
.../job/local/LocalJobExecutorConfigs.java | 38 +++
.../gravitino/job/local/LocalProcessBuilder.java | 48 ++++
.../gravitino/job/local/ShellProcessBuilder.java | 60 ++++
.../gravitino/job/local/TestLocalJobExecutor.java | 228 +++++++++++++++
core/src/test/resources/common.sh | 20 ++
core/src/test/resources/test-job.sh | 47 +++
9 files changed, 779 insertions(+), 4 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java
b/core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java
index cdf06c59af..7399957254 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobExecutorFactory.java
@@ -19,11 +19,14 @@
package org.apache.gravitino.job;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.Configs;
import org.apache.gravitino.connector.job.JobExecutor;
+import org.apache.gravitino.job.local.LocalJobExecutor;
+import org.apache.gravitino.job.local.LocalJobExecutorConfigs;
public class JobExecutorFactory {
@@ -31,15 +34,26 @@ public class JobExecutorFactory {
private static final String JOB_EXECUTOR_CLASS_SUFFIX = ".class";
+ private static final Map<String, String> BUILTIN_EXECUTORS =
+ ImmutableMap.of(
+ LocalJobExecutorConfigs.LOCAL_JOB_EXECUTOR_NAME,
+ LocalJobExecutor.class.getCanonicalName());
+
private JobExecutorFactory() {
// Private constructor to prevent instantiation
}
public static JobExecutor create(Config config) {
String jobExecutorName = config.get(Configs.JOB_EXECUTOR);
- String jobExecutorClassKey =
- JOB_EXECUTOR_CONF_PREFIX + jobExecutorName + JOB_EXECUTOR_CLASS_SUFFIX;
- String clzName = config.getRawString(jobExecutorClassKey);
+ String clzName;
+ if (BUILTIN_EXECUTORS.containsKey(jobExecutorName)) {
+ clzName = BUILTIN_EXECUTORS.get(jobExecutorName);
+ } else {
+ String jobExecutorClassKey =
+ JOB_EXECUTOR_CONF_PREFIX + jobExecutorName +
JOB_EXECUTOR_CLASS_SUFFIX;
+ clzName = config.getRawString(jobExecutorClassKey);
+ }
+
Preconditions.checkArgument(
StringUtils.isNotBlank(clzName),
"Job executor class name must be specified for job executor: %s",
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index e0307713b9..2b4120ec64 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -384,7 +384,7 @@ public class JobManager implements JobOperationDispatcher,
Closeable {
}
@VisibleForTesting
- static JobTemplate createRuntimeJobTemplate(
+ public static JobTemplate createRuntimeJobTemplate(
JobTemplateEntity jobTemplateEntity, Map<String, String> jobConf, File
stagingDir) {
String name = jobTemplateEntity.name();
String comment = jobTemplateEntity.comment();
diff --git
a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
new file mode 100644
index 0000000000..8674903f8c
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.job.local;
+
+import static
org.apache.gravitino.job.local.LocalJobExecutorConfigs.DEFAULT_JOB_STATUS_KEEP_TIME_MS;
+import static
org.apache.gravitino.job.local.LocalJobExecutorConfigs.DEFAULT_MAX_RUNNING_JOBS;
+import static
org.apache.gravitino.job.local.LocalJobExecutorConfigs.DEFAULT_WAITING_QUEUE_SIZE;
+import static
org.apache.gravitino.job.local.LocalJobExecutorConfigs.JOB_STATUS_KEEP_TIME_MS;
+import static
org.apache.gravitino.job.local.LocalJobExecutorConfigs.MAX_RUNNING_JOBS;
+import static
org.apache.gravitino.job.local.LocalJobExecutorConfigs.WAITING_QUEUE_SIZE;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gravitino.connector.job.JobExecutor;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LocalJobExecutor implements JobExecutor {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LocalJobExecutor.class);
+
+ private static final String LOCAL_JOB_PREFIX = "local-job-";
+
+ private static final long UNEXPIRED_TIME_IN_MS = -1L;
+
+ private Map<String, String> configs;
+
+ private BlockingQueue<Pair<String, JobTemplate>> waitingQueue;
+
+ private ExecutorService jobExecutorService;
+
+ private ExecutorService jobPollingExecutorService;
+
+ // The job status map to keep track of the status of each job. In the
meantime, the job status
+ // will be stored in the entity store, so we will clean the finished,
cancelled and failed jobs
+ // from the map periodically to save the memory.
+ private Map<String, Pair<JobHandle.Status, Long>> jobStatus;
+ private final Object lock = new Object();
+
+ private long jobStatusKeepTimeInMs;
+ private ScheduledExecutorService jobStatusCleanupExecutor;
+
+ private volatile boolean finished = false;
+
+ private Map<String, Process> runningProcesses;
+
+ @Override
+ public void initialize(Map<String, String> configs) {
+ this.configs = configs;
+
+ int waitingQueueSize =
+ configs.containsKey(WAITING_QUEUE_SIZE)
+ ? Integer.parseInt(configs.get(WAITING_QUEUE_SIZE))
+ : DEFAULT_WAITING_QUEUE_SIZE;
+ Preconditions.checkArgument(
+ waitingQueueSize > 0,
+ "Waiting queue size must be greater than 0, but got: %s",
+ waitingQueueSize);
+
+ this.waitingQueue = new LinkedBlockingQueue<>(waitingQueueSize);
+
+ this.jobPollingExecutorService =
+ Executors.newSingleThreadExecutor(
+ runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("LocalJobPollingExecutor-" + thread.getId());
+ thread.setDaemon(true);
+ return thread;
+ });
+ jobPollingExecutorService.submit(this::pollJob);
+
+ int maxRunningJobs =
+ configs.containsKey(MAX_RUNNING_JOBS)
+ ? Integer.parseInt(configs.get(MAX_RUNNING_JOBS))
+ : DEFAULT_MAX_RUNNING_JOBS;
+ Preconditions.checkArgument(
+ maxRunningJobs > 0, "Max running jobs must be greater than 0, but got:
%s", maxRunningJobs);
+
+ this.jobExecutorService =
+ new ThreadPoolExecutor(
+ 0,
+ maxRunningJobs,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("LocalJobExecutor-" + thread.getId());
+ thread.setDaemon(true);
+ return thread;
+ });
+
+ this.jobStatus = Maps.newHashMap();
+
+ this.jobStatusKeepTimeInMs =
+ configs.containsKey(JOB_STATUS_KEEP_TIME_MS)
+ ? Long.parseLong(configs.get(JOB_STATUS_KEEP_TIME_MS))
+ : DEFAULT_JOB_STATUS_KEEP_TIME_MS;
+ Preconditions.checkArgument(
+ jobStatusKeepTimeInMs > 0,
+ "Job status keep time must be greater than 0, but got: %s",
+ jobStatusKeepTimeInMs);
+
+ long jobStatusCleanupIntervalInMs = jobStatusKeepTimeInMs / 10;
+ this.jobStatusCleanupExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName("LocalJobStatusCleanup-" + thread.getId());
+ thread.setDaemon(true);
+ return thread;
+ });
+ jobStatusCleanupExecutor.scheduleAtFixedRate(
+ this::cleanupJobStatus,
+ jobStatusCleanupIntervalInMs,
+ jobStatusCleanupIntervalInMs,
+ TimeUnit.MILLISECONDS);
+
+ this.runningProcesses = Maps.newConcurrentMap();
+ }
+
+ @Override
+ public String submitJob(JobTemplate jobTemplate) {
+ String newJobId = LOCAL_JOB_PREFIX + UUID.randomUUID();
+ Pair<String, JobTemplate> jobPair = Pair.of(newJobId, jobTemplate);
+
+ // Add the job template to the waiting queue
+ if (!waitingQueue.offer(jobPair)) {
+ throw new IllegalStateException("Waiting queue is full, cannot submit
job: " + jobTemplate);
+ }
+
+ synchronized (lock) {
+ jobStatus.put(newJobId, Pair.of(JobHandle.Status.QUEUED,
UNEXPIRED_TIME_IN_MS));
+ }
+
+ return newJobId;
+ }
+
+ @Override
+ public JobHandle.Status getJobStatus(String jobId) throws NoSuchJobException
{
+ synchronized (lock) {
+ if (!jobStatus.containsKey(jobId)) {
+ throw new NoSuchJobException("No job found with ID: %s", jobId);
+ }
+
+ return jobStatus.get(jobId).getLeft();
+ }
+ }
+
+ @Override
+ public void cancelJob(String jobId) throws NoSuchJobException {
+ synchronized (lock) {
+ if (!jobStatus.containsKey(jobId)) {
+ throw new NoSuchJobException("No job found with ID: %s", jobId);
+ }
+
+ Pair<JobHandle.Status, Long> statusPair = jobStatus.get(jobId);
+ if (statusPair.getLeft() == JobHandle.Status.SUCCEEDED
+ || statusPair.getLeft() == JobHandle.Status.FAILED
+ || statusPair.getLeft() == JobHandle.Status.CANCELLED) {
+ LOG.warn("Job {} is already completed or cancelled, no action taken",
jobId);
+ return;
+ }
+
+ // If the job is queued.
+ if (statusPair.getLeft() == JobHandle.Status.QUEUED) {
+ waitingQueue.removeIf(p -> p.getLeft().equals(jobId));
+ jobStatus.put(jobId, Pair.of(JobHandle.Status.CANCELLED,
System.currentTimeMillis()));
+ LOG.info("Job {} is cancelled from the waiting queue", jobId);
+ return;
+ }
+
+ if (statusPair.getLeft() == JobHandle.Status.STARTED) {
+ Process process = runningProcesses.get(jobId);
+ if (process != null) {
+ process.destroy();
+ }
+ LOG.info("Job {} is cancelling while running", jobId);
+ jobStatus.put(jobId, Pair.of(JobHandle.Status.CANCELLING,
UNEXPIRED_TIME_IN_MS));
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Mark the executor as finished to stop processing jobs
+ this.finished = true;
+ jobPollingExecutorService.shutdownNow();
+
+ if (!waitingQueue.isEmpty()) {
+ LOG.warn(
+ "There are still {} jobs in the waiting queue, they will not be
processed.",
+ waitingQueue.size());
+ waitingQueue.clear();
+ }
+
+ // Stop the running jobs
+ runningProcesses.forEach(
+ (key, process) -> {
+ if (process != null) {
+ process.destroy();
+ }
+ });
+ runningProcesses.clear();
+
+ jobExecutorService.shutdownNow();
+
+ // Stop the job status cleanup executor
+ jobStatusCleanupExecutor.shutdownNow();
+ jobStatus.clear();
+ }
+
+ public void runJob(Pair<String, JobTemplate> jobPair) {
+ try {
+ String jobId = jobPair.getLeft();
+ JobTemplate jobTemplate = jobPair.getRight();
+ synchronized (lock) {
+ jobStatus.put(jobId, Pair.of(JobHandle.Status.STARTED,
UNEXPIRED_TIME_IN_MS));
+ }
+
+ LocalProcessBuilder processBuilder =
LocalProcessBuilder.create(jobTemplate, configs);
+ Process process = processBuilder.start();
+ runningProcesses.put(jobId, process);
+
+ LOG.info("Starting job: {}", jobId);
+ int exitCode = process.waitFor();
+ if (exitCode == 0) {
+ LOG.info("Job {} completed successfully", jobId);
+ synchronized (lock) {
+ jobStatus.put(jobId, Pair.of(JobHandle.Status.SUCCEEDED,
System.currentTimeMillis()));
+ }
+ } else {
+ synchronized (lock) {
+ JobHandle.Status oldStatus = jobStatus.get(jobId).getLeft();
+ if (oldStatus == JobHandle.Status.CANCELLING) {
+ LOG.info("Job {} was cancelled while running with exit code: {}",
jobId, exitCode);
+ jobStatus.put(jobId, Pair.of(JobHandle.Status.CANCELLED,
System.currentTimeMillis()));
+ } else if (oldStatus == JobHandle.Status.STARTED) {
+ LOG.warn("Job {} failed after starting with exit code: {}", jobId,
exitCode);
+ jobStatus.put(jobId, Pair.of(JobHandle.Status.FAILED,
System.currentTimeMillis()));
+ }
+ }
+ }
+
+ runningProcesses.remove(jobId);
+
+ } catch (Exception e) {
+ LOG.error("Error while executing job", e);
+ // If an error occurs, we should mark the job as failed
+ synchronized (lock) {
+ String jobId = jobPair.getLeft();
+ jobStatus.put(jobId, Pair.of(JobHandle.Status.FAILED,
System.currentTimeMillis()));
+ }
+ }
+ }
+
+ public void pollJob() {
+ while (!finished) {
+ try {
+ Pair<String, JobTemplate> jobPair = waitingQueue.poll(3000,
TimeUnit.MILLISECONDS);
+ if (jobPair == null) {
+ // If no job is available, continue to the next iteration
+ continue;
+ }
+
+ jobExecutorService.submit(() -> runJob(jobPair));
+
+ } catch (InterruptedException e) {
+ LOG.warn("Polling job interrupted", e);
+ finished = true;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void cleanupJobStatus() {
+ long currentTime = System.currentTimeMillis();
+
+ synchronized (lock) {
+ jobStatus
+ .entrySet()
+ .removeIf(
+ entry ->
+ entry.getValue().getRight() != UNEXPIRED_TIME_IN_MS
+ && (currentTime - entry.getValue().getRight()) >=
jobStatusKeepTimeInMs);
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutorConfigs.java
b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutorConfigs.java
new file mode 100644
index 0000000000..627f7b1fa1
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutorConfigs.java
@@ -0,0 +1,38 @@
+package org.apache.gravitino.job.local;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+public class LocalJobExecutorConfigs {
+
+ private LocalJobExecutorConfigs() {
+ // Private constructor to prevent instantiation
+ }
+
+ public static final String LOCAL_JOB_EXECUTOR_NAME = "local";
+
+ public static final String WAITING_QUEUE_SIZE = "waitingQueueSize";
+ public static final int DEFAULT_WAITING_QUEUE_SIZE = 100;
+
+ public static final String MAX_RUNNING_JOBS = "maxRunningJobs";
+ public static final int DEFAULT_MAX_RUNNING_JOBS =
+ Math.max(1, Math.min(Runtime.getRuntime().availableProcessors() / 2,
10));
+
+ public static final String JOB_STATUS_KEEP_TIME_MS = "jobStatusKeepTimeInMs";
+ public static final long DEFAULT_JOB_STATUS_KEEP_TIME_MS = 60 * 60 * 1000;
// 1 hour
+}
diff --git
a/core/src/main/java/org/apache/gravitino/job/local/LocalProcessBuilder.java
b/core/src/main/java/org/apache/gravitino/job/local/LocalProcessBuilder.java
new file mode 100644
index 0000000000..4307d62cb0
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/job/local/LocalProcessBuilder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job.local;
+
+import java.io.File;
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.ShellJobTemplate;
+
+public abstract class LocalProcessBuilder {
+
+ protected final JobTemplate jobTemplate;
+
+ protected final File workingDirectory;
+
+ protected LocalProcessBuilder(JobTemplate jobTemplate, Map<String, String>
configs) {
+ this.jobTemplate = jobTemplate;
+ // Executable should be in the working directory, so we can figure out the
working directory
+ // from the executable path.
+ this.workingDirectory = new File(jobTemplate.executable()).getParentFile();
+ }
+
+ public abstract Process start();
+
+ public static LocalProcessBuilder create(JobTemplate jobTemplate,
Map<String, String> configs) {
+ if (jobTemplate instanceof ShellJobTemplate) {
+ return new ShellProcessBuilder((ShellJobTemplate) jobTemplate, configs);
+ } else {
+ throw new IllegalArgumentException("Unsupported job template type: " +
jobTemplate.jobType());
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/job/local/ShellProcessBuilder.java
b/core/src/main/java/org/apache/gravitino/job/local/ShellProcessBuilder.java
new file mode 100644
index 0000000000..57e3b20c05
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/job/local/ShellProcessBuilder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.job.local;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.job.ShellJobTemplate;
+
+public class ShellProcessBuilder extends LocalProcessBuilder {
+
+ protected ShellProcessBuilder(ShellJobTemplate shellJobTemplate, Map<String,
String> configs) {
+ super(shellJobTemplate, configs);
+ }
+
+ @Override
+ public Process start() {
+ ShellJobTemplate shellJobTemplate = (ShellJobTemplate) jobTemplate;
+ File executableFile = new File(shellJobTemplate.executable());
+ if (!executableFile.canExecute()) {
+ executableFile.setExecutable(true);
+ }
+
+ List<String> commandList =
Lists.newArrayList(shellJobTemplate.executable());
+ commandList.addAll(shellJobTemplate.arguments());
+
+ ProcessBuilder builder = new ProcessBuilder(commandList);
+ builder.directory(workingDirectory);
+ builder.environment().putAll(shellJobTemplate.environments());
+
+ File outputFile = new File(workingDirectory, "output.log");
+ File errorFile = new File(workingDirectory, "error.log");
+ builder.redirectOutput(outputFile);
+ builder.redirectError(errorFile);
+
+ try {
+ return builder.start();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start shell process", e);
+ }
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/job/local/TestLocalJobExecutor.java
b/core/src/test/java/org/apache/gravitino/job/local/TestLocalJobExecutor.java
new file mode 100644
index 0000000000..c88c0811df
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/job/local/TestLocalJobExecutor.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job.local;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.gravitino.connector.job.JobExecutor;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobManager;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+public class TestLocalJobExecutor {
+
+ private static JobExecutor jobExecutor;
+
+ private static JobTemplateEntity jobTemplateEntity;
+
+ private static File workingDir;
+
+ @BeforeAll
+ public static void setUpClass() throws IOException {
+ jobExecutor = new LocalJobExecutor();
+ jobExecutor.initialize(Collections.emptyMap());
+
+ URL testJobScriptUrl =
TestLocalJobExecutor.class.getResource("/test-job.sh");
+ Assertions.assertNotNull(testJobScriptUrl);
+ File testJobScriptFile = new File(testJobScriptUrl.getFile());
+
+ URL commonScriptUrl = TestLocalJobExecutor.class.getResource("/common.sh");
+ Assertions.assertNotNull(commonScriptUrl);
+ File commonScriptFile = new File(commonScriptUrl.getFile());
+
+ JobTemplateEntity.TemplateContent templateContent =
+ JobTemplateEntity.TemplateContent.builder()
+ .withExecutable(testJobScriptFile.getAbsolutePath())
+ .withArguments(Lists.newArrayList("{{arg1}}", "{{arg2}}"))
+ .withEnvironments(ImmutableMap.of("ENV_VAR", "{{var}}"))
+ .withJobType(JobTemplate.JobType.SHELL)
+
.withScripts(Lists.newArrayList(commonScriptFile.getAbsolutePath()))
+ .withCustomFields(Collections.emptyMap())
+ .build();
+ jobTemplateEntity =
+ JobTemplateEntity.builder()
+ .withId(1L)
+ .withName("test-job-template")
+ .withNamespace(NamespaceUtil.ofJobTemplate("test"))
+ .withComment("test")
+ .withTemplateContent(templateContent)
+ .withAuditInfo(AuditInfo.EMPTY)
+ .build();
+ }
+
+ @AfterAll
+ public static void tearDownClass() throws IOException {
+ if (jobExecutor != null) {
+ jobExecutor.close();
+ jobExecutor = null;
+ }
+ }
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ workingDir =
Files.createTempDirectory("gravitino-test-local-job-executor").toFile();
+ }
+
+ @BeforeEach
+ public void tearDown() throws IOException {
+ if (workingDir != null && workingDir.exists()) {
+ FileUtils.deleteDirectory(workingDir);
+ }
+ }
+
+ @Test
+ public void testSubmitJobSuccessfully() throws IOException {
+ Map<String, String> jobConf =
+ ImmutableMap.of(
+ "arg1", "value1",
+ "arg2", "success",
+ "var", "value3");
+
+ JobTemplate template =
+ JobManager.createRuntimeJobTemplate(jobTemplateEntity, jobConf,
workingDir);
+
+ String jobId = jobExecutor.submitJob(template);
+ Assertions.assertNotNull(jobId);
+
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> jobExecutor.getJobStatus(jobId) ==
JobHandle.Status.SUCCEEDED);
+
+ String output = FileUtils.readFileToString(new File(workingDir,
"output.log"), "UTF-8");
+ Assertions.assertTrue(output.contains("value1"));
+ Assertions.assertTrue(output.contains("success"));
+ Assertions.assertTrue(output.contains("value3"));
+ Assertions.assertTrue(output.contains("in common script"));
+
+ Assertions.assertEquals(JobHandle.Status.SUCCEEDED,
jobExecutor.getJobStatus(jobId));
+ }
+
+ @Test
+ public void testSubmitJobFailure() throws IOException {
+ Map<String, String> jobConf =
+ ImmutableMap.of(
+ "arg1", "value1",
+ "arg2", "fail",
+ "var", "value3");
+
+ JobTemplate template =
+ JobManager.createRuntimeJobTemplate(jobTemplateEntity, jobConf,
workingDir);
+
+ String jobId = jobExecutor.submitJob(template);
+ Assertions.assertNotNull(jobId);
+
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> jobExecutor.getJobStatus(jobId) ==
JobHandle.Status.FAILED);
+
+ String output = FileUtils.readFileToString(new File(workingDir,
"output.log"), "UTF-8");
+ Assertions.assertTrue(output.contains("value1"));
+ Assertions.assertTrue(output.contains("fail"));
+ Assertions.assertTrue(output.contains("value3"));
+ Assertions.assertTrue(output.contains("in common script"));
+
+ Assertions.assertEquals(JobHandle.Status.FAILED,
jobExecutor.getJobStatus(jobId));
+ }
+
+ @Test
+ public void testCancelJob() throws InterruptedException {
+ Map<String, String> jobConf =
+ ImmutableMap.of(
+ "arg1", "value1",
+ "arg2", "success",
+ "var", "value3");
+
+ JobTemplate template =
+ JobManager.createRuntimeJobTemplate(jobTemplateEntity, jobConf,
workingDir);
+
+ String jobId = jobExecutor.submitJob(template);
+ Assertions.assertNotNull(jobId);
+ // sleep a while to ensure the job is running.
+ Thread.sleep(100);
+
+ jobExecutor.cancelJob(jobId);
+
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> jobExecutor.getJobStatus(jobId) ==
JobHandle.Status.CANCELLED);
+
+ Assertions.assertEquals(JobHandle.Status.CANCELLED,
jobExecutor.getJobStatus(jobId));
+
+ // Cancelling a job that is already cancelled.
+ Assertions.assertDoesNotThrow(() -> jobExecutor.cancelJob(jobId));
+ Assertions.assertEquals(JobHandle.Status.CANCELLED,
jobExecutor.getJobStatus(jobId));
+ }
+
+ @Test
+ public void testCancelSucceededJob() {
+ // Cancelling a job that is already succeeded.
+ Map<String, String> successJobConf =
+ ImmutableMap.of(
+ "arg1", "value1",
+ "arg2", "success",
+ "var", "value3");
+
+ JobTemplate successTemplate =
+ JobManager.createRuntimeJobTemplate(jobTemplateEntity, successJobConf,
workingDir);
+ String successJobId = jobExecutor.submitJob(successTemplate);
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> jobExecutor.getJobStatus(successJobId) ==
JobHandle.Status.SUCCEEDED);
+
+ Assertions.assertDoesNotThrow(() -> jobExecutor.cancelJob(successJobId));
+ Assertions.assertEquals(JobHandle.Status.SUCCEEDED,
jobExecutor.getJobStatus(successJobId));
+ }
+
+ @Test
+ public void testCancelFailedJob() {
+ // Cancelling a job that is already failed.
+ Map<String, String> failJobConf =
+ ImmutableMap.of(
+ "arg1", "value1",
+ "arg2", "fail",
+ "var", "value3");
+
+ JobTemplate failTemplate =
+ JobManager.createRuntimeJobTemplate(jobTemplateEntity, failJobConf,
workingDir);
+ String failJobId = jobExecutor.submitJob(failTemplate);
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> jobExecutor.getJobStatus(failJobId) ==
JobHandle.Status.FAILED);
+
+ Assertions.assertDoesNotThrow(() -> jobExecutor.cancelJob(failJobId));
+ Assertions.assertEquals(JobHandle.Status.FAILED,
jobExecutor.getJobStatus(failJobId));
+ }
+}
diff --git a/core/src/test/resources/common.sh
b/core/src/test/resources/common.sh
new file mode 100755
index 0000000000..4a29c32441
--- /dev/null
+++ b/core/src/test/resources/common.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+echo "in common script"
diff --git a/core/src/test/resources/test-job.sh
b/core/src/test/resources/test-job.sh
new file mode 100755
index 0000000000..a7725e3b7b
--- /dev/null
+++ b/core/src/test/resources/test-job.sh
@@ -0,0 +1,47 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This script is used to test the job submission and execution in a local
environment.
+echo "starting test test job"
+
+bin="$(dirname "${BASH_SOURCE-$0}")"
+bin="$(cd "${bin}">/dev/null; pwd)"
+
+. "${bin}/common.sh"
+
+sleep 3
+
+JOB_NAME="test_job-$(date +%s)-$1"
+
+echo "Submitting job with name: $JOB_NAME"
+
+echo "$1"
+
+echo "$2"
+
+echo "$ENV_VAR"
+
+if [[ "$2" == "success" ]]; then
+ exit 0
+elif [[ "$2" == "fail" ]]; then
+ exit 1
+else
+ exit 2
+fi