This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new daaa14b591 [#7504] feat(core): Add the job framework for Job System 
(part-1) (#7695)
daaa14b591 is described below

commit daaa14b59161347f340ec18b8f0dcb45094d96c4
Author: Jerry Shao <[email protected]>
AuthorDate: Mon Jul 21 15:17:31 2025 +0800

    [#7504] feat(core): Add the job framework for Job System (part-1) (#7695)
    
    ### What changes were proposed in this pull request?
    
    This is the first part of work to add the job framework for Job System
    in Gravitino.
    
    ### Why are the changes needed?
    
    Fix: #7504
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add new UTs to cover the code.
---
 .../java/org/apache/gravitino/job/JobHandle.java   |   3 +
 .../java/org/apache/gravitino/job/JobTemplate.java |   5 +
 .../org/apache/gravitino/job/ShellJobTemplate.java |   5 +
 .../org/apache/gravitino/job/SparkJobTemplate.java |  15 +
 .../main/java/org/apache/gravitino/Configs.java    |   7 +
 .../src/main/java/org/apache/gravitino/Entity.java |  41 +-
 .../gravitino/SupportsRelationOperations.java      |   4 +-
 .../java/org/apache/gravitino/job/JobManager.java  | 233 +++++++++++
 .../gravitino/job/JobOperationDispatcher.java      | 101 +++++
 .../java/org/apache/gravitino/meta/JobEntity.java  | 161 ++++++++
 .../apache/gravitino/meta/JobTemplateEntity.java   | 239 +++++++++++
 .../gravitino/storage/relational/JDBCBackend.java  |   8 +
 .../apache/gravitino/utils/NameIdentifierUtil.java |  22 ++
 .../org/apache/gravitino/utils/NamespaceUtil.java  |  21 +
 .../org/apache/gravitino/job/TestJobManager.java   | 438 +++++++++++++++++++++
 15 files changed, 1267 insertions(+), 36 deletions(-)

diff --git a/api/src/main/java/org/apache/gravitino/job/JobHandle.java 
b/api/src/main/java/org/apache/gravitino/job/JobHandle.java
index 815e72ea48..38927538de 100644
--- a/api/src/main/java/org/apache/gravitino/job/JobHandle.java
+++ b/api/src/main/java/org/apache/gravitino/job/JobHandle.java
@@ -24,6 +24,9 @@ package org.apache.gravitino.job;
  */
 public interface JobHandle {
 
+  /** The prefix for job IDs, every job ID returned from Gravitino will be 
like job-uuid */
+  String JOB_ID_PREFIX = "job-";
+
   /** The status of the job. */
   enum Status {
 
diff --git a/api/src/main/java/org/apache/gravitino/job/JobTemplate.java 
b/api/src/main/java/org/apache/gravitino/job/JobTemplate.java
index 51714b6784..fa6d89f211 100644
--- a/api/src/main/java/org/apache/gravitino/job/JobTemplate.java
+++ b/api/src/main/java/org/apache/gravitino/job/JobTemplate.java
@@ -41,6 +41,11 @@ import org.apache.commons.lang3.StringUtils;
  *   <li>arguments
  *   <li>environments
  * </ul>
+ *
+ * <p>executable is the path to the executable that will be run, it should be 
an absolute path that
+ * can be accessed by the Gravitino server, current Gravitino can support 
executables in the local
+ * file system, or on the web server (e.g., HTTP or HTTPS, FTP). Distributed 
file systems like HDFS
+ * or S3 will be supported in the future.
  */
 public abstract class JobTemplate {
 
diff --git a/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java 
b/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java
index 566d2b74eb..afb6fdd07d 100644
--- a/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java
+++ b/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java
@@ -26,6 +26,11 @@ import java.util.Objects;
  * Represents a job template for executing shell scripts. This class extends 
the JobTemplate class
  * and provides functionality specific to shell job templates, including a 
list of scripts to be
  * executed.
+ *
+ * <p>Scripts are a list of shell files that will be leveraged by the 
"executable". Scripts must be
+ * put in the place where Gravitino server can access them. Current Gravitino 
can support scripts in
+ * the local file system, or on the web server (e.g., HTTP, HTTPS, FTP). 
Distributed file systems
+ * like HDFS or S3 will be supported in the future.
  */
 public class ShellJobTemplate extends JobTemplate {
 
diff --git a/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java 
b/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
index c70d193ede..a1f68f7e75 100644
--- a/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
+++ b/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
@@ -30,6 +30,21 @@ import org.apache.commons.lang3.StringUtils;
  * Represents a job template for executing Spark applications. This class 
extends the JobTemplate
  * class and provides functionality specific to Spark job templates, including 
the class name, jars,
  * files, archives, and configurations required for the Spark job.
+ *
+ * <p>Take Spark word count job as an example:
+ *
+ * <p>className: "org.apache.spark.examples.JavaWordCount" executable
+ * "https://example.com/spark-examples.jar"; arguments: ["{{input_path}}", 
"{{output_path}}"]
+ * configs: {"spark.master": "local[*]", "spark.app.name": "WordCount"}
+ *
+ * <p>configs is a map of configuration parameters that will be used by the 
Spark application. It
+ * can be templated by using placeholders like "{{foo_value}}" and 
"{{bar_value}}". These
+ * placeholders will be replaced with actual values when the job is executed.
+ *
+ * <p>jars, files, and archives are lists of resources that will be used by 
the Spark application .
+ * These resources must be accessible to the Gravitino server, and can be 
located in the local file
+ * system, on a web server (e.g., HTTP, HTTPS, FTP). Distributed file systems 
like HDFS or S3 will
+ * be supported in the future.
  */
 public class SparkJobTemplate extends JobTemplate {
 
diff --git a/core/src/main/java/org/apache/gravitino/Configs.java 
b/core/src/main/java/org/apache/gravitino/Configs.java
index 24d445ccb7..5a6544b1d4 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -406,4 +406,11 @@ public class Configs {
           .stringConf()
           .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
           .createWithDefault("caffeine");
+
+  public static final ConfigEntry<String> JOB_STAGING_DIR =
+      new ConfigBuilder("gravitino.job.stagingDir")
+          .doc("Directory for managing staging files when running jobs.")
+          .version(ConfigConstants.VERSION_1_0_0)
+          .stringConf()
+          .createWithDefault("/tmp/gravitino/jobs/staging");
 }
diff --git a/core/src/main/java/org/apache/gravitino/Entity.java 
b/core/src/main/java/org/apache/gravitino/Entity.java
index 355455c7b1..c8d973f8c5 100644
--- a/core/src/main/java/org/apache/gravitino/Entity.java
+++ b/core/src/main/java/org/apache/gravitino/Entity.java
@@ -18,9 +18,7 @@
  */
 package org.apache.gravitino;
 
-import com.google.common.collect.ImmutableList;
 import java.io.Serializable;
-import java.util.List;
 import java.util.Map;
 import lombok.Getter;
 
@@ -58,6 +56,10 @@ public interface Entity extends Serializable {
   /** The policy schema name in the system catalog. */
   String POLICY_SCHEMA_NAME = "policy";
 
+  String JOB_TEMPLATE_SCHEMA_NAME = "job_template";
+
+  String JOB_SCHEMA_NAME = "job";
+
   /** Enumeration defining the types of entities in the Gravitino framework. */
   @Getter
   enum EntityType {
@@ -75,40 +77,9 @@ public interface Entity extends Serializable {
     MODEL,
     MODEL_VERSION,
     POLICY,
+    JOB_TEMPLATE,
+    JOB,
     AUDIT;
-
-    /**
-     * Returns the parent entity types of the given entity type. The parent 
entity types are the
-     * entity types that are higher in the hierarchy than the given entity 
type. For example, the
-     * parent entity types of a table are metalake, catalog, and schema. 
(Sequence: root to leaf)
-     *
-     * @param entityType The entity type for which to get the parent entity 
types.
-     * @return The parent entity types of the given entity type.
-     */
-    public static List<EntityType> getParentEntityTypes(EntityType entityType) 
{
-      switch (entityType) {
-        case METALAKE:
-          return ImmutableList.of();
-        case CATALOG:
-          return ImmutableList.of(METALAKE);
-        case SCHEMA:
-          return ImmutableList.of(METALAKE, CATALOG);
-        case TABLE:
-        case FILESET:
-        case TOPIC:
-        case MODEL:
-        case USER:
-        case GROUP:
-        case ROLE:
-          return ImmutableList.of(METALAKE, CATALOG, SCHEMA);
-        case COLUMN:
-          return ImmutableList.of(METALAKE, CATALOG, SCHEMA, TABLE);
-        case MODEL_VERSION:
-          return ImmutableList.of(METALAKE, CATALOG, SCHEMA, MODEL);
-        default:
-          throw new IllegalArgumentException("Unknown entity type: " + 
entityType);
-      }
-    }
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java 
b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
index dad992412d..22bc08acd5 100644
--- a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
+++ b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
@@ -36,7 +36,9 @@ public interface SupportsRelationOperations {
     /** Role and user relationship */
     ROLE_USER_REL,
     /** Role and group relationship */
-    ROLE_GROUP_REL
+    ROLE_GROUP_REL,
+    /** Job template and job relationship */
+    JOB_TEMPLATE_JOB_REL
   }
 
   /**
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java 
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
new file mode 100644
index 0000000000..09c36cf22f
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.job;
+
+import static org.apache.gravitino.metalake.MetalakeManager.checkMetalake;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
+import org.apache.gravitino.lock.LockType;
+import org.apache.gravitino.lock.TreeLockUtils;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+
+public class JobManager implements JobOperationDispatcher, Closeable {
+
+  private final EntityStore entityStore;
+
+  private final File stagingDir;
+
+  public JobManager(Config config, EntityStore entityStore) {
+    this.entityStore = entityStore;
+
+    String stagingDirPath = config.get(Configs.JOB_STAGING_DIR);
+    this.stagingDir = new File(stagingDirPath);
+    if (stagingDir.exists()) {
+      if (!stagingDir.isDirectory()) {
+        throw new IllegalArgumentException(
+            String.format("Staging directory %s exists but is not a 
directory", stagingDirPath));
+      }
+
+      if (!(stagingDir.canExecute() && stagingDir.canRead() && 
stagingDir.canWrite())) {
+        throw new IllegalArgumentException(
+            String.format("Staging directory %s is not accessible", 
stagingDirPath));
+      }
+    } else {
+      if (!stagingDir.mkdirs()) {
+        throw new IllegalArgumentException(
+            String.format("Failed to create staging directory %s", 
stagingDirPath));
+      }
+    }
+  }
+
+  @Override
+  public List<JobTemplateEntity> listJobTemplates(String metalake) {
+    checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+    Namespace jobTemplateNs = NamespaceUtil.ofJobTemplate(metalake);
+    return TreeLockUtils.doWithTreeLock(
+        NameIdentifier.of(jobTemplateNs.levels()),
+        LockType.READ,
+        () -> {
+          try {
+            return entityStore.list(
+                jobTemplateNs, JobTemplateEntity.class, 
Entity.EntityType.JOB_TEMPLATE);
+          } catch (IOException ioe) {
+            throw new RuntimeException(ioe);
+          }
+        });
+  }
+
+  @Override
+  public void registerJobTemplate(String metalake, JobTemplateEntity 
jobTemplateEntity)
+      throws JobTemplateAlreadyExistsException {
+    checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+    NameIdentifier jobTemplateIdent =
+        NameIdentifierUtil.ofJobTemplate(metalake, jobTemplateEntity.name());
+    TreeLockUtils.doWithTreeLock(
+        jobTemplateIdent,
+        LockType.WRITE,
+        () -> {
+          try {
+            entityStore.put(jobTemplateEntity, false /* overwrite */);
+            return null;
+          } catch (EntityAlreadyExistsException e) {
+            throw new JobTemplateAlreadyExistsException(
+                "Job template with name %s under metalake %s already exists",
+                jobTemplateEntity.name(), metalake);
+          } catch (IOException ioe) {
+            throw new RuntimeException(ioe);
+          }
+        });
+  }
+
+  @Override
+  public JobTemplateEntity getJobTemplate(String metalake, String 
jobTemplateName)
+      throws NoSuchJobTemplateException {
+    checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+    NameIdentifier jobTemplateIdent = 
NameIdentifierUtil.ofJobTemplate(metalake, jobTemplateName);
+    return TreeLockUtils.doWithTreeLock(
+        jobTemplateIdent,
+        LockType.READ,
+        () -> {
+          try {
+            return entityStore.get(
+                jobTemplateIdent, Entity.EntityType.JOB_TEMPLATE, 
JobTemplateEntity.class);
+          } catch (NoSuchEntityException e) {
+            throw new NoSuchJobTemplateException(
+                "Job template with name %s under metalake %s does not exist",
+                jobTemplateName, metalake);
+          } catch (IOException ioe) {
+            throw new RuntimeException(ioe);
+          }
+        });
+  }
+
+  @Override
+  public boolean deleteJobTemplate(String metalake, String jobTemplateName) 
throws InUseException {
+    checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+    // TODO. Check if there are any running jobs associated with the job 
template. If there are
+    //  running jobs, throw InUseException
+
+    // Delete the job template entity as well as all the jobs associated with 
it.
+    return TreeLockUtils.doWithTreeLock(
+        NameIdentifier.of(NamespaceUtil.ofJobTemplate(metalake).levels()),
+        LockType.WRITE,
+        () -> {
+          try {
+            return entityStore.delete(
+                NameIdentifierUtil.ofJobTemplate(metalake, jobTemplateName),
+                Entity.EntityType.JOB_TEMPLATE);
+          } catch (IOException ioe) {
+            throw new RuntimeException(ioe);
+          }
+        });
+  }
+
+  @Override
+  public List<JobEntity> listJobs(String metalake, Optional<String> 
jobTemplateName)
+      throws NoSuchJobTemplateException {
+    checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+    Namespace jobNs = NamespaceUtil.ofJob(metalake);
+    return TreeLockUtils.doWithTreeLock(
+        NameIdentifier.of(jobNs.levels()),
+        LockType.READ,
+        () -> {
+          try {
+            // If jobTemplateName is present, check if the job template 
exists, will throw an
+            // exception if the job template does not exist.
+            jobTemplateName.ifPresent(s -> getJobTemplate(metalake, s));
+
+            List<JobEntity> jobEntities;
+            if (jobTemplateName.isPresent()) {
+              NameIdentifier jobTemplateIdent =
+                  NameIdentifierUtil.ofJobTemplate(metalake, 
jobTemplateName.get());
+
+              // Lock the job template to ensure no concurrent 
modifications/deletions
+              jobEntities =
+                  TreeLockUtils.doWithTreeLock(
+                      jobTemplateIdent,
+                      LockType.READ,
+                      () ->
+                          // List all the jobs associated with the job template
+                          entityStore
+                              .relationOperations()
+                              .listEntitiesByRelation(
+                                  
SupportsRelationOperations.Type.JOB_TEMPLATE_JOB_REL,
+                                  jobTemplateIdent,
+                                  Entity.EntityType.JOB_TEMPLATE));
+            } else {
+              jobEntities = entityStore.list(jobNs, JobEntity.class, 
Entity.EntityType.JOB);
+            }
+            return jobEntities;
+
+          } catch (IOException ioe) {
+            throw new RuntimeException(ioe);
+          }
+        });
+  }
+
+  @Override
+  public JobEntity getJob(String metalake, String jobId) throws 
NoSuchJobException {
+    checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+    NameIdentifier jobIdent = NameIdentifierUtil.ofJob(metalake, jobId);
+    return TreeLockUtils.doWithTreeLock(
+        jobIdent,
+        LockType.READ,
+        () -> {
+          try {
+            return entityStore.get(jobIdent, Entity.EntityType.JOB, 
JobEntity.class);
+          } catch (NoSuchEntityException e) {
+            throw new NoSuchJobException(
+                "Job with ID %s under metalake %s does not exist", jobId, 
metalake);
+          } catch (IOException ioe) {
+            throw new RuntimeException(ioe);
+          }
+        });
+  }
+
+  @Override
+  public void close() throws IOException {
+    // TODO. Implement any necessary cleanup logic for the JobManager.
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
new file mode 100644
index 0000000000..42f0d75e05
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.job;
+
+import java.util.List;
+import java.util.Optional;
+import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+
+/** The interface for job operation dispatcher. */
+public interface JobOperationDispatcher {
+
+  /**
+   * Lists all the job templates in the specified metalake.
+   *
+   * @param metalake the name of the metalake
+   * @return a list of job templates
+   */
+  List<JobTemplateEntity> listJobTemplates(String metalake);
+
+  /**
+   * Registers a job template in the specified metalake.
+   *
+   * @param metalake the name of the metalake
+   * @param jobTemplateEntity the job template entity to register
+   * @throws JobTemplateAlreadyExistsException if a job template with the same 
name already exists
+   */
+  void registerJobTemplate(String metalake, JobTemplateEntity 
jobTemplateEntity)
+      throws JobTemplateAlreadyExistsException;
+
+  /**
+   * Retrieves a job template by its name in the specified metalake.
+   *
+   * @param metalake the name of the metalake
+   * @param jobTemplateName the name of the job template to retrieve
+   * @return the job template entity associated with the specified name
+   * @throws NoSuchJobTemplateException if no job template with the specified 
name exists
+   */
+  JobTemplateEntity getJobTemplate(String metalake, String jobTemplateName)
+      throws NoSuchJobTemplateException;
+
+  /**
+   * Deletes a job template by its name in the specified metalake.
+   *
+   * @param metalake the name of the metalake
+   * @param jobTemplateName the name of the job template to delete
+   * @return true if the job template was successfully deleted, false if the 
job template does not
+   *     exist
+   * @throws InUseException if there are still queued or started jobs 
associated with the job
+   */
+  boolean deleteJobTemplate(String metalake, String jobTemplateName) throws 
InUseException;
+
+  /**
+   * List all the jobs. If the jobTemplateName is provided, it will list the 
jobs associated with
+   * that job template, if not, it will list all the jobs in the specified 
metalake.
+   *
+   * @param metalake the name of the metalake
+   * @param jobTemplateName the name of the job template to filter jobs by, if 
present
+   * @return a list of job entities
+   * @throws NoSuchJobTemplateException if the job template does not exist
+   */
+  List<JobEntity> listJobs(String metalake, Optional<String> jobTemplateName)
+      throws NoSuchJobTemplateException;
+
+  /**
+   * Retrieves a job by its ID in the specified metalake.
+   *
+   * @param metalake the name of the metalake
+   * @param jobId the ID of the job to retrieve
+   * @return the job entity associated with the specified ID
+   * @throws NoSuchJobException if no job with the specified ID exists
+   */
+  JobEntity getJob(String metalake, String jobId) throws NoSuchJobException;
+
+  // TODO. Implement the runJob and cancelJob methods in another PR.
+  //  JobEntity runJob(String metalake, String jobTemplateName, Map<String, 
String> jobConf)
+  //      throws NoSuchJobTemplateException;
+  //
+  //  JobEntity cancelJob(String metalake, String jobId) throws 
NoSuchJobException;
+}
diff --git a/core/src/main/java/org/apache/gravitino/meta/JobEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
new file mode 100644
index 0000000000..4083154685
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.meta;
+
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import lombok.ToString;
+import org.apache.gravitino.Auditable;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.Field;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.job.JobHandle;
+
+@ToString
+public class JobEntity implements Entity, Auditable, HasIdentifier {
+
+  public static final Field ID =
+      Field.required("id", Long.class, "The unique id of the job entity.");
+  public static final Field STATUS =
+      Field.required("status", JobHandle.Status.class, "The status of the 
job.");
+  public static final Field TEMPLATE_NAME =
+      Field.required("job_template_name", String.class, "The name of the job 
template.");
+  public static final Field AUDIT_INFO =
+      Field.required(
+          "audit_info", AuditInfo.class, "The audit details of the job 
template entity.");
+
+  private Long id;
+  private JobHandle.Status status;
+  private String jobTemplateName;
+  private Namespace namespace;
+  private AuditInfo auditInfo;
+
+  private JobEntity() {}
+
+  @Override
+  public Map<Field, Object> fields() {
+    Map<Field, Object> fields = Maps.newHashMap();
+    fields.put(ID, id);
+    fields.put(TEMPLATE_NAME, jobTemplateName);
+    fields.put(STATUS, status);
+    fields.put(AUDIT_INFO, auditInfo);
+    return Collections.unmodifiableMap(fields);
+  }
+
+  @Override
+  public Long id() {
+    return id;
+  }
+
+  @Override
+  public String name() {
+    return JobHandle.JOB_ID_PREFIX + id;
+  }
+
+  @Override
+  public Namespace namespace() {
+    return namespace;
+  }
+
+  public JobHandle.Status status() {
+    return status;
+  }
+
+  public String jobTemplateName() {
+    return jobTemplateName;
+  }
+
+  @Override
+  public AuditInfo auditInfo() {
+    return auditInfo;
+  }
+
+  @Override
+  public EntityType type() {
+    return EntityType.JOB;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof JobEntity)) {
+      return false;
+    }
+
+    JobEntity that = (JobEntity) o;
+    return Objects.equals(id, that.id)
+        && Objects.equals(status, that.status)
+        && Objects.equals(jobTemplateName, that.jobTemplateName)
+        && Objects.equals(namespace, that.namespace)
+        && Objects.equals(auditInfo, that.auditInfo);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(id, namespace, status, jobTemplateName, auditInfo);
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private final JobEntity jobEntity;
+
+    private Builder() {
+      this.jobEntity = new JobEntity();
+    }
+
+    public Builder withId(Long id) {
+      jobEntity.id = id;
+      return this;
+    }
+
+    public Builder withNamespace(Namespace namespace) {
+      jobEntity.namespace = namespace;
+      return this;
+    }
+
+    public Builder withStatus(JobHandle.Status status) {
+      jobEntity.status = status;
+      return this;
+    }
+
+    public Builder withJobTemplateName(String jobTemplateName) {
+      jobEntity.jobTemplateName = jobTemplateName;
+      return this;
+    }
+
+    public Builder withAuditInfo(AuditInfo auditInfo) {
+      jobEntity.auditInfo = auditInfo;
+      return this;
+    }
+
+    public JobEntity build() {
+      jobEntity.validate();
+      return jobEntity;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/meta/JobTemplateEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/JobTemplateEntity.java
new file mode 100644
index 0000000000..de030c5d50
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/meta/JobTemplateEntity.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.meta;
+
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.gravitino.Auditable;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.Field;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.ShellJobTemplate;
+import org.apache.gravitino.job.SparkJobTemplate;
+
+@ToString
+public class JobTemplateEntity implements Entity, Auditable, HasIdentifier {
+
+  /**
+   * A job template content that will be used internally to maintain all the 
information of a job
+   * template. this class will be serialized and stored in the entity store. 
Internally, we don't
+   * separate the different types of job templates, so this class will be used 
to represent all the
+   * job templates.
+   */
+  @Getter()
+  @Accessors(fluent = true)
+  @ToString
+  @NoArgsConstructor
+  @AllArgsConstructor(access = lombok.AccessLevel.PRIVATE)
+  @EqualsAndHashCode
+  @lombok.Builder(setterPrefix = "with")
+  public static class TemplateContent {
+    // Generic fields for job template content
+    private JobTemplate.JobType jobType;
+    private String executable;
+    private List<String> arguments;
+    private Map<String, String> environments;
+    private Map<String, String> customFields;
+    // Fields for Shell job template content
+    private List<String> scripts;
+    // Fields for Spark job template content
+    private String className;
+    private List<String> jars;
+    private List<String> files;
+    private List<String> archives;
+    private Map<String, String> configs;
+
+    public static TemplateContent fromJobTemplate(JobTemplate jobTemplate) {
+      TemplateContentBuilder builder =
+          TemplateContent.builder()
+              .withJobType(jobTemplate.jobType())
+              .withExecutable(jobTemplate.executable())
+              .withArguments(jobTemplate.arguments())
+              .withEnvironments(jobTemplate.environments())
+              .withCustomFields(jobTemplate.customFields());
+
+      if (jobTemplate instanceof ShellJobTemplate) {
+        ShellJobTemplate shellJobTemplate = (ShellJobTemplate) jobTemplate;
+        builder.withScripts(shellJobTemplate.scripts());
+
+      } else if (jobTemplate instanceof SparkJobTemplate) {
+        SparkJobTemplate sparkJobTemplate = (SparkJobTemplate) jobTemplate;
+        builder
+            .withClassName(sparkJobTemplate.className())
+            .withJars(sparkJobTemplate.jars())
+            .withFiles(sparkJobTemplate.files())
+            .withArchives(sparkJobTemplate.archives())
+            .withConfigs(sparkJobTemplate.configs());
+      }
+
+      return builder.build();
+    }
+  }
+
+  public static final Field ID =
+      Field.required("id", Long.class, "The unique id of the job template 
entity.");
+  public static final Field NAME =
+      Field.required("name", String.class, "The name of the job template 
entity.");
+  public static final Field COMMENT =
+      Field.optional(
+          "comment", String.class, "The comment or description of the job 
template entity.");
+  public static final Field TEMPLATE_CONTENT =
+      Field.required(
+          "template_content",
+          TemplateContent.class,
+          "The template content of the job template entity.");
+  public static final Field AUDIT_INFO =
+      Field.required(
+          "audit_info", AuditInfo.class, "The audit details of the job 
template entity.");
+
+  private Long id;
+  private String name;
+  private Namespace namespace;
+  private String comment;
+  private TemplateContent templateContent;
+  private AuditInfo auditInfo;
+
+  private JobTemplateEntity() {}
+
+  @Override
+  public Map<Field, Object> fields() {
+    Map<Field, Object> fields = Maps.newHashMap();
+    fields.put(ID, id);
+    fields.put(NAME, name);
+    fields.put(COMMENT, comment);
+    fields.put(TEMPLATE_CONTENT, templateContent);
+    fields.put(AUDIT_INFO, auditInfo);
+    return Collections.unmodifiableMap(fields);
+  }
+
+  @Override
+  public Long id() {
+    return id;
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public Namespace namespace() {
+    return namespace;
+  }
+
+  public String comment() {
+    return comment;
+  }
+
+  public TemplateContent templateContent() {
+    return templateContent;
+  }
+
+  @Override
+  public AuditInfo auditInfo() {
+    return auditInfo;
+  }
+
+  @Override
+  public EntityType type() {
+    return EntityType.JOB_TEMPLATE;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof JobTemplateEntity)) {
+      return false;
+    }
+
+    JobTemplateEntity that = (JobTemplateEntity) o;
+    return Objects.equals(id, that.id)
+        && Objects.equals(name, that.name)
+        && Objects.equals(namespace, that.namespace)
+        && Objects.equals(comment, that.comment)
+        && Objects.equals(templateContent, that.templateContent)
+        && Objects.equals(auditInfo, that.auditInfo);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(id, name, namespace, comment, templateContent, 
auditInfo);
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private final JobTemplateEntity jobTemplateEntity;
+
+    private Builder() {
+      this.jobTemplateEntity = new JobTemplateEntity();
+    }
+
+    public Builder withId(Long id) {
+      jobTemplateEntity.id = id;
+      return this;
+    }
+
+    public Builder withName(String name) {
+      jobTemplateEntity.name = name;
+      return this;
+    }
+
+    public Builder withNamespace(Namespace namespace) {
+      jobTemplateEntity.namespace = namespace;
+      return this;
+    }
+
+    public Builder withComment(String comment) {
+      jobTemplateEntity.comment = comment;
+      return this;
+    }
+
+    public Builder withTemplateContent(TemplateContent templateContent) {
+      jobTemplateEntity.templateContent = templateContent;
+      return this;
+    }
+
+    public Builder withAuditInfo(AuditInfo auditInfo) {
+      jobTemplateEntity.auditInfo = auditInfo;
+      return this;
+    }
+
+    public JobTemplateEntity build() {
+      jobTemplateEntity.validate();
+      return jobTemplateEntity;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index a51e306310..966b4e3250 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -353,6 +353,12 @@ public class JDBCBackend implements RelationalBackend {
         return ModelVersionMetaService.getInstance()
             .deleteModelVersionMetasByLegacyTimeline(
                 legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
+      case JOB_TEMPLATE:
+        // TODO: Implement hard delete logic for job templates.
+        return 0;
+      case JOB:
+        // TODO: Implement hard delete logic for jobs.
+        return 0;
       case AUDIT:
         return 0;
         // TODO: Implement hard delete logic for these entity types.
@@ -380,6 +386,8 @@ public class JDBCBackend implements RelationalBackend {
       case TAG:
       case MODEL:
       case MODEL_VERSION:
+      case JOB_TEMPLATE:
+      case JOB:
         // These entity types have not implemented multi-versions, so we can 
skip.
         return 0;
 
diff --git 
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index e556cbce9f..10d7ec4f13 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -279,6 +279,28 @@ public class NameIdentifierUtil {
         alias);
   }
 
+  /**
+   * Create the job template {@link NameIdentifier} with the given metalake 
and job template name.
+   *
+   * @param metalake The metalake name
+   * @param jobTemplateName The job template name
+   * @return The created job template {@link NameIdentifier}
+   */
+  public static NameIdentifier ofJobTemplate(String metalake, String 
jobTemplateName) {
+    return NameIdentifier.of(NamespaceUtil.ofJobTemplate(metalake), 
jobTemplateName);
+  }
+
+  /**
+   * Create the job {@link NameIdentifier} with the given metalake and job 
name.
+   *
+   * @param metalake The metalake name
+   * @param jobName The job name
+   * @return The created job {@link NameIdentifier}
+   */
+  public static NameIdentifier ofJob(String metalake, String jobName) {
+    return NameIdentifier.of(NamespaceUtil.ofJob(metalake), jobName);
+  }
+
   /**
    * Try to get the catalog {@link NameIdentifier} from the given {@link 
NameIdentifier}.
    *
diff --git a/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java 
b/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
index 63743c635b..c294a90145 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
@@ -185,6 +185,27 @@ public class NamespaceUtil {
     return Namespace.of(metalake, catalog, schema, model);
   }
 
+  /**
+   * Create a namespace for job template.
+   *
+   * @param metalake The metalake name
+   * @return A namespace for job template
+   */
+  public static Namespace ofJobTemplate(String metalake) {
+    return Namespace.of(
+        metalake, Entity.SYSTEM_CATALOG_RESERVED_NAME, 
Entity.JOB_TEMPLATE_SCHEMA_NAME);
+  }
+
+  /**
+   * Create a namespace for job.
+   *
+   * @param metalake The metalake name
+   * @return A namespace for job
+   */
+  public static Namespace ofJob(String metalake) {
+    return Namespace.of(metalake, Entity.SYSTEM_CATALOG_RESERVED_NAME, 
Entity.JOB_SCHEMA_NAME);
+  }
+
   /**
    * Convert a model name identifier to a model version namespace.
    *
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java 
b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
new file mode 100644
index 0000000000..079f84aec3
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job;
+
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.MetalakeInUseException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.lock.LockManager;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.metalake.MetalakeManager;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+public class TestJobManager {
+
+  private static JobManager jobManager;
+
+  private static EntityStore entityStore;
+
+  private static Config config;
+
+  private static String testStagingDir;
+
+  private static String metalake = "test_metalake";
+
+  private static NameIdentifier metalakeIdent = NameIdentifier.of(metalake);
+
+  private static MockedStatic<MetalakeManager> mockedMetalake;
+
+  @BeforeAll
+  public static void setUp() throws IllegalAccessException {
+    config = new Config(false) {};
+    Random rand = new Random();
+    testStagingDir = "test_staging_dir_" + rand.nextInt(1000);
+    config.set(Configs.JOB_STAGING_DIR, testStagingDir);
+
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
+    entityStore = Mockito.mock(EntityStore.class);
+
+    JobManager jm = new JobManager(config, entityStore);
+    jobManager = Mockito.spy(jm);
+    mockedMetalake = mockStatic(MetalakeManager.class);
+  }
+
+  @AfterAll
+  public static void tearDown() throws Exception {
+    // Clean up resources if necessary
+    jobManager.close();
+    FileUtils.deleteDirectory(new File(testStagingDir));
+    mockedMetalake.close();
+  }
+
+  @AfterEach
+  public void reset() {
+    // Reset the mocked static methods after each test
+    mockedMetalake.reset();
+    Mockito.reset(entityStore);
+  }
+
+  @Test
+  public void testListJobTemplates() throws IOException {
+    mockedMetalake
+        .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+        .thenAnswer(a -> null);
+    JobTemplateEntity shellJobTemplate =
+        newShellJobTemplateEntity("shell_job", "A shell job template");
+    JobTemplateEntity sparkJobTemplate =
+        newSparkJobTemplateEntity("spark_job", "A spark job template");
+
+    when(entityStore.list(
+            NamespaceUtil.ofJobTemplate(metalake),
+            JobTemplateEntity.class,
+            Entity.EntityType.JOB_TEMPLATE))
+        .thenReturn(Lists.newArrayList(shellJobTemplate, sparkJobTemplate));
+
+    List<JobTemplateEntity> templates = jobManager.listJobTemplates(metalake);
+    Assertions.assertEquals(2, templates.size());
+    Assertions.assertTrue(templates.contains(shellJobTemplate));
+    Assertions.assertTrue(templates.contains(sparkJobTemplate));
+
+    // Throw exception if metalake does not exist
+    mockedMetalake
+        .when(() -> 
MetalakeManager.checkMetalake(NameIdentifier.of("non_existent"), entityStore))
+        .thenThrow(new NoSuchMetalakeException("Metalake does not exist"));
+
+    Exception e =
+        Assertions.assertThrows(
+            NoSuchMetalakeException.class, () -> 
jobManager.listJobTemplates("non_existent"));
+    Assertions.assertEquals("Metalake does not exist", e.getMessage());
+
+    // Throw exception if metalake is in use
+    mockedMetalake
+        .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+        .thenThrow(new MetalakeInUseException("Metalake is in use"));
+
+    e =
+        Assertions.assertThrows(
+            MetalakeInUseException.class, () -> 
jobManager.listJobTemplates(metalake));
+    Assertions.assertEquals("Metalake is in use", e.getMessage());
+
+    // Throw exception if entity store fails
+    doThrow(new IOException("Entity store error"))
+        .when(entityStore)
+        .list(
+            NamespaceUtil.ofJobTemplate(metalake),
+            JobTemplateEntity.class,
+            Entity.EntityType.JOB_TEMPLATE);
+
+    Assertions.assertThrows(RuntimeException.class, () -> 
jobManager.listJobTemplates(metalake));
+  }
+
+  @Test
+  public void testRegisterJobTemplate() throws IOException {
+    mockedMetalake
+        .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+        .thenAnswer(a -> null);
+
+    JobTemplateEntity shellJobTemplate =
+        newShellJobTemplateEntity("shell_job", "A shell job template");
+    doNothing().when(entityStore).put(shellJobTemplate, false);
+
+    // Register a new job template
+    Assertions.assertDoesNotThrow(() -> 
jobManager.registerJobTemplate(metalake, shellJobTemplate));
+
+    // Throw exception if job template already exists
+    doThrow(new EntityAlreadyExistsException("Job template already exists"))
+        .when(entityStore)
+        .put(shellJobTemplate, false /* overwrite */);
+
+    Exception e =
+        Assertions.assertThrows(
+            JobTemplateAlreadyExistsException.class,
+            () -> jobManager.registerJobTemplate(metalake, shellJobTemplate));
+    Assertions.assertEquals(
+        "Job template with name shell_job under metalake test_metalake already 
exists",
+        e.getMessage());
+
+    // Throw exception if metalake does not exist
+    mockedMetalake
+        .when(() -> 
MetalakeManager.checkMetalake(NameIdentifier.of("non_existent"), entityStore))
+        .thenThrow(new NoSuchMetalakeException("Metalake does not exist"));
+
+    e =
+        Assertions.assertThrows(
+            NoSuchMetalakeException.class,
+            () -> jobManager.registerJobTemplate("non_existent", 
shellJobTemplate));
+    Assertions.assertEquals("Metalake does not exist", e.getMessage());
+
+    // Throw exception if metalake is in use
+    mockedMetalake
+        .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+        .thenThrow(new MetalakeInUseException("Metalake is in use"));
+
+    e =
+        Assertions.assertThrows(
+            MetalakeInUseException.class,
+            () -> jobManager.registerJobTemplate(metalake, shellJobTemplate));
+    Assertions.assertEquals("Metalake is in use", e.getMessage());
+
+    // Throw exception if entity store fails
+    doThrow(new IOException("Entity store error"))
+        .when(entityStore)
+        .put(shellJobTemplate, false /* overwrite */);
+
+    Assertions.assertThrows(
+        RuntimeException.class, () -> jobManager.registerJobTemplate(metalake, 
shellJobTemplate));
+  }
+
+  @Test
+  public void testGetJobTemplate() throws IOException {
+    mockedMetalake
+        .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+        .thenAnswer(a -> null);
+
+    JobTemplateEntity shellJobTemplate =
+        newShellJobTemplateEntity("shell_job", "A shell job template");
+    when(entityStore.get(
+            NameIdentifierUtil.ofJobTemplate(metalake, 
shellJobTemplate.name()),
+            Entity.EntityType.JOB_TEMPLATE,
+            JobTemplateEntity.class))
+        .thenReturn(shellJobTemplate);
+
+    // Get an existing job template
+    JobTemplateEntity retrievedTemplate = jobManager.getJobTemplate(metalake, 
"shell_job");
+    Assertions.assertEquals(shellJobTemplate, retrievedTemplate);
+
+    // Throw exception if job template does not exist
+    when(entityStore.get(
+            NameIdentifierUtil.ofJobTemplate(metalake, "non_existent"),
+            Entity.EntityType.JOB_TEMPLATE,
+            JobTemplateEntity.class))
+        .thenThrow(new NoSuchEntityException("Job template does not exist"));
+
+    Exception e =
+        Assertions.assertThrows(
+            NoSuchJobTemplateException.class,
+            () -> jobManager.getJobTemplate(metalake, "non_existent"));
+    Assertions.assertEquals(
+        "Job template with name non_existent under metalake test_metalake does 
not exist",
+        e.getMessage());
+
+    // Throw exception if entity store fails
+    doThrow(new IOException("Entity store error"))
+        .when(entityStore)
+        .get(
+            NameIdentifierUtil.ofJobTemplate(metalake, "job"),
+            Entity.EntityType.JOB_TEMPLATE,
+            JobTemplateEntity.class);
+    Assertions.assertThrows(
+        RuntimeException.class, () -> jobManager.getJobTemplate(metalake, 
"job"));
+  }
+
+  @Test
+  public void testDeleteJobTemplate() throws IOException {
+    mockedMetalake
+        .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+        .thenAnswer(a -> null);
+
+    JobTemplateEntity shellJobTemplate =
+        newShellJobTemplateEntity("shell_job", "A shell job template");
+    doReturn(true)
+        .when(entityStore)
+        .delete(
+            NameIdentifierUtil.ofJobTemplate(metalake, 
shellJobTemplate.name()),
+            Entity.EntityType.JOB_TEMPLATE);
+
+    // Delete an existing job template
+    Assertions.assertTrue(() -> jobManager.deleteJobTemplate(metalake, 
"shell_job"));
+
+    doReturn(false)
+        .when(entityStore)
+        .delete(
+            NameIdentifierUtil.ofJobTemplate(metalake, "shell_job"),
+            Entity.EntityType.JOB_TEMPLATE);
+
+    // Delete a non-existing job template
+    Assertions.assertFalse(() -> jobManager.deleteJobTemplate(metalake, 
"shell_job"));
+
+    // Throw exception if entity store fails
+    doThrow(new IOException("Entity store error"))
+        .when(entityStore)
+        .delete(NameIdentifierUtil.ofJobTemplate(metalake, "job"), 
Entity.EntityType.JOB_TEMPLATE);
+    Assertions.assertThrows(
+        RuntimeException.class, () -> jobManager.deleteJobTemplate(metalake, 
"job"));
+  }
+
+  @Test
+  public void testListJobs() throws IOException {
+    mockedMetalake
+        .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+        .thenAnswer(a -> null);
+
+    JobTemplateEntity shellJobTemplate =
+        newShellJobTemplateEntity("shell_job", "A shell job template");
+    when(jobManager.getJobTemplate(metalake, 
shellJobTemplate.name())).thenReturn(shellJobTemplate);
+
+    JobEntity job1 = newJobEntity("shell_job");
+    JobEntity job2 = newJobEntity("spark_job");
+
+    SupportsRelationOperations supportsRelationOperations =
+        Mockito.mock(SupportsRelationOperations.class);
+    when(supportsRelationOperations.listEntitiesByRelation(
+            SupportsRelationOperations.Type.JOB_TEMPLATE_JOB_REL,
+            NameIdentifierUtil.ofJobTemplate(metalake, 
shellJobTemplate.name()),
+            Entity.EntityType.JOB_TEMPLATE))
+        .thenReturn(Lists.newArrayList(job1));
+    
when(entityStore.relationOperations()).thenReturn(supportsRelationOperations);
+
+    // Mock the listJobs method to return a list of jobs associated with the 
job template
+    when(entityStore.list(NamespaceUtil.ofJob(metalake), JobEntity.class, 
Entity.EntityType.JOB))
+        .thenReturn(Lists.newArrayList(job1, job2));
+
+    List<JobEntity> jobs = jobManager.listJobs(metalake, 
Optional.of(shellJobTemplate.name()));
+    Assertions.assertEquals(1, jobs.size());
+    Assertions.assertTrue(jobs.contains(job1));
+    Assertions.assertFalse(jobs.contains(job2));
+
+    // List all jobs without filtering by job template
+    jobs = jobManager.listJobs(metalake, Optional.empty());
+    Assertions.assertEquals(2, jobs.size());
+    Assertions.assertTrue(jobs.contains(job1));
+    Assertions.assertTrue(jobs.contains(job2));
+
+    // Throw exception if job template does not exist
+    when(jobManager.getJobTemplate(metalake, "non_existent"))
+        .thenThrow(new NoSuchJobTemplateException("Job template does not 
exist"));
+
+    Exception e =
+        Assertions.assertThrows(
+            NoSuchJobTemplateException.class,
+            () -> jobManager.listJobs(metalake, Optional.of("non_existent")));
+    Assertions.assertEquals("Job template does not exist", e.getMessage());
+
+    // Throw exception if entity store fails
+    doThrow(new IOException("Entity store error"))
+        .when(entityStore)
+        .list(NamespaceUtil.ofJob(metalake), JobEntity.class, 
Entity.EntityType.JOB);
+    Assertions.assertThrows(
+        RuntimeException.class, () -> jobManager.listJobs(metalake, 
Optional.empty()));
+  }
+
+  @Test
+  public void testGetJob() throws IOException {
+    mockedMetalake
+        .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+        .thenAnswer(a -> null);
+
+    JobEntity job = newJobEntity("shell_job");
+    when(entityStore.get(
+            NameIdentifierUtil.ofJob(metalake, job.name()), 
Entity.EntityType.JOB, JobEntity.class))
+        .thenReturn(job);
+
+    // Get an existing job
+    JobEntity retrievedJob = jobManager.getJob(metalake, job.name());
+    Assertions.assertEquals(job, retrievedJob);
+
+    // Throw exception if job does not exist
+    when(entityStore.get(
+            NameIdentifierUtil.ofJob(metalake, "non_existent"),
+            Entity.EntityType.JOB,
+            JobEntity.class))
+        .thenThrow(new NoSuchEntityException("Job does not exist"));
+
+    Exception e =
+        Assertions.assertThrows(
+            NoSuchJobException.class, () -> jobManager.getJob(metalake, 
"non_existent"));
+    Assertions.assertEquals(
+        "Job with ID non_existent under metalake test_metalake does not 
exist", e.getMessage());
+
+    // Throw exception if entity store fails
+    doThrow(new IOException("Entity store error"))
+        .when(entityStore)
+        .get(NameIdentifierUtil.ofJob(metalake, "job"), Entity.EntityType.JOB, 
JobEntity.class);
+    Assertions.assertThrows(RuntimeException.class, () -> 
jobManager.getJob(metalake, "job"));
+  }
+
+  private static JobTemplateEntity newShellJobTemplateEntity(String name, 
String comment) {
+    ShellJobTemplate shellJobTemplate =
+        new ShellJobTemplate.Builder()
+            .withName(name)
+            .withComment(comment)
+            .withExecutable("/bin/echo")
+            .build();
+
+    Random rand = new Random();
+    return JobTemplateEntity.builder()
+        .withId(rand.nextLong())
+        .withName(name)
+        .withNamespace(NamespaceUtil.ofJobTemplate(metalake))
+        
.withTemplateContent(JobTemplateEntity.TemplateContent.fromJobTemplate(shellJobTemplate))
+        .withAuditInfo(
+            
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+        .build();
+  }
+
+  private static JobTemplateEntity newSparkJobTemplateEntity(String name, 
String comment) {
+    SparkJobTemplate sparkJobTemplate =
+        new SparkJobTemplate.Builder()
+            .withName(name)
+            .withComment(comment)
+            .withClassName("org.apache.spark.examples.SparkPi")
+            .withExecutable("local:///path/to/spark-examples.jar")
+            .build();
+
+    Random rand = new Random();
+    return JobTemplateEntity.builder()
+        .withId(rand.nextLong())
+        .withName(name)
+        .withNamespace(NamespaceUtil.ofJobTemplate(metalake))
+        
.withTemplateContent(JobTemplateEntity.TemplateContent.fromJobTemplate(sparkJobTemplate))
+        .withAuditInfo(
+            
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+        .build();
+  }
+
+  private static JobEntity newJobEntity(String templateName) {
+    Random rand = new Random();
+    return JobEntity.builder()
+        .withId(rand.nextLong())
+        .withNamespace(NamespaceUtil.ofJob(metalake))
+        .withJobTemplateName(templateName)
+        .withStatus(JobHandle.Status.QUEUED)
+        .withAuditInfo(
+            
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+        .build();
+  }
+}

Reply via email to