This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 75b00ef224 [#7758] feat(server): Add REST API for job system (part-1) 
(#7923)
75b00ef224 is described below

commit 75b00ef224460b65ee98979db6c6c5b552fa041d
Author: Jerry Shao <[email protected]>
AuthorDate: Tue Aug 5 20:17:24 2025 +0800

    [#7758] feat(server): Add REST API for job system (part-1) (#7923)
    
    ### What changes were proposed in this pull request?
    
    Add the job template related REST APIs for job system.
    
    ### Why are the changes needed?
    
    related to #7758
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UTs to cover the code.
---
 .../apache/gravitino/dto/job/JobTemplateDTO.java   | 136 ++++++
 .../gravitino/dto/job/ShellJobTemplateDTO.java     |  45 ++
 .../gravitino/dto/job/SparkJobTemplateDTO.java     |  68 +++
 .../dto/requests/JobTemplateRegisterRequest.java   |  60 +++
 .../dto/responses/JobTemplateListResponse.java     |  56 +++
 .../dto/responses/JobTemplateResponse.java         |  57 +++
 .../apache/gravitino/dto/util/DTOConverters.java   |  46 ++
 .../gravitino/dto/job/TestJobTemplateDTO.java      | 440 +++++++++++++++++
 .../requests/TestJobTemplateRegisterRequest.java   |  83 ++++
 .../java/org/apache/gravitino/GravitinoEnv.java    |  30 +-
 .../java/org/apache/gravitino/job/JobManager.java  |   3 +-
 .../gravitino/job/JobOperationDispatcher.java      |   3 +-
 .../postgresql/JobMetaPostgreSQLProvider.java      |   2 +-
 .../apache/gravitino/server/GravitinoServer.java   |   2 +
 .../server/web/rest/ExceptionHandlers.java         |  47 ++
 .../gravitino/server/web/rest/JobOperations.java   | 253 ++++++++++
 .../server/web/rest/TestJobOperations.java         | 532 +++++++++++++++++++++
 17 files changed, 1857 insertions(+), 6 deletions(-)

diff --git 
a/common/src/main/java/org/apache/gravitino/dto/job/JobTemplateDTO.java 
b/common/src/main/java/org/apache/gravitino/dto/job/JobTemplateDTO.java
new file mode 100644
index 0000000000..a906276f1c
--- /dev/null
+++ b/common/src/main/java/org/apache/gravitino/dto/job/JobTemplateDTO.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.job;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import lombok.experimental.SuperBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.dto.AuditDTO;
+import org.apache.gravitino.job.JobTemplate;
+
+/** Represents a Job Template Data Transfer Object (DTO). */
+@JsonTypeInfo(
+    use = JsonTypeInfo.Id.NAME,
+    include = JsonTypeInfo.As.EXISTING_PROPERTY,
+    property = "jobType",
+    visible = true)
+@JsonSubTypes({
+  @JsonSubTypes.Type(value = SparkJobTemplateDTO.class, name = "spark"),
+  @JsonSubTypes.Type(value = ShellJobTemplateDTO.class, name = "shell"),
+})
+@Getter
+@Accessors(fluent = true)
+@EqualsAndHashCode
+@SuperBuilder(setterPrefix = "with")
+@ToString
+public abstract class JobTemplateDTO {
+
+  @JsonDeserialize(using = JobTemplateDTO.JobTypeDeserializer.class)
+  @JsonSerialize(using = JobTemplateDTO.JobTypeSerializer.class)
+  @JsonProperty("jobType")
+  private JobTemplate.JobType jobType;
+
+  @JsonProperty("name")
+  private String name;
+
+  @JsonProperty("comment")
+  private String comment;
+
+  @JsonProperty("executable")
+  private String executable;
+
+  @JsonProperty("arguments")
+  private List<String> arguments;
+
+  @JsonProperty("environments")
+  private Map<String, String> environments;
+
+  @JsonProperty("customFields")
+  private Map<String, String> customFields;
+
+  @JsonProperty("audit")
+  private AuditDTO audit;
+
+  /**
+   * Default constructor for Jackson. This constructor is required for 
deserialization of the DTO.
+   */
+  protected JobTemplateDTO() {
+    // Default constructor for Jackson
+  }
+
+  /** Validates the JobTemplateDTO. Ensures that required fields are not null 
or empty. */
+  public void validate() {
+    Preconditions.checkArgument(jobType != null, "\"jobType\" is required and 
cannot be null");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(name), "\"name\" is required and cannot be 
empty");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(executable), "\"executable\" is required and 
cannot be empty");
+  }
+
+  /** Deserializer for JobTemplate.JobType. */
+  public static class JobTypeDeserializer extends 
JsonDeserializer<JobTemplate.JobType> {
+
+    @Override
+    public JobTemplate.JobType deserialize(JsonParser p, 
DeserializationContext ctxt)
+        throws IOException {
+      String value = p.getText();
+      if (value != null) {
+        try {
+          return JobTemplate.JobType.valueOf(value.toUpperCase());
+        } catch (IllegalArgumentException e) {
+          throw ctxt.weirdStringException(
+              value, JobTemplate.JobType.class, "Invalid jobType value");
+        }
+      }
+
+      throw ctxt.weirdStringException(
+          null, JobTemplate.JobType.class, "jobType cannot be null or empty");
+    }
+  }
+
+  /** Serializer for JobTemplate.JobType. */
+  public static class JobTypeSerializer extends 
JsonSerializer<JobTemplate.JobType> {
+
+    @Override
+    public void serialize(
+        JobTemplate.JobType value, JsonGenerator gen, SerializerProvider 
serializers)
+        throws IOException {
+      Preconditions.checkArgument(value != null, "\"jobType\" cannot be null");
+      gen.writeString(value.name().toLowerCase());
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/job/ShellJobTemplateDTO.java 
b/common/src/main/java/org/apache/gravitino/dto/job/ShellJobTemplateDTO.java
new file mode 100644
index 0000000000..45efed2928
--- /dev/null
+++ b/common/src/main/java/org/apache/gravitino/dto/job/ShellJobTemplateDTO.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.job;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import lombok.experimental.SuperBuilder;
+
+/** Represents a Shell Job Template Data Transfer Object (DTO). */
+@Getter
+@Accessors(fluent = true)
+@EqualsAndHashCode(callSuper = true)
+@SuperBuilder(setterPrefix = "with")
+@ToString(callSuper = true)
+public class ShellJobTemplateDTO extends JobTemplateDTO {
+
+  @JsonProperty("scripts")
+  private List<String> scripts;
+
+  /** Creates a new ShellJobTemplateDTO with the specified properties. */
+  private ShellJobTemplateDTO() {
+    // Default constructor for Jackson
+    super();
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/job/SparkJobTemplateDTO.java 
b/common/src/main/java/org/apache/gravitino/dto/job/SparkJobTemplateDTO.java
new file mode 100644
index 0000000000..7b492639e8
--- /dev/null
+++ b/common/src/main/java/org/apache/gravitino/dto/job/SparkJobTemplateDTO.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.job;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import lombok.experimental.SuperBuilder;
+import org.apache.commons.lang3.StringUtils;
+
+/** Represents a Spark Job Template Data Transfer Object (DTO). */
+@Getter
+@Accessors(fluent = true)
+@EqualsAndHashCode(callSuper = true)
+@SuperBuilder(setterPrefix = "with")
+@ToString(callSuper = true)
+public class SparkJobTemplateDTO extends JobTemplateDTO {
+
+  @JsonProperty("className")
+  private String className;
+
+  @JsonProperty("jars")
+  private List<String> jars;
+
+  @JsonProperty("files")
+  private List<String> files;
+
+  @JsonProperty("archives")
+  private List<String> archives;
+
+  @JsonProperty("configs")
+  private Map<String, String> configs;
+
+  /** Creates a new SparkJobTemplateDTO with the specified properties. */
+  private SparkJobTemplateDTO() {
+    // Default constructor for Jackson
+    super();
+  }
+
+  @Override
+  public void validate() {
+    super.validate();
+
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(className), "\"className\" is required and 
cannot be empty");
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/requests/JobTemplateRegisterRequest.java
 
b/common/src/main/java/org/apache/gravitino/dto/requests/JobTemplateRegisterRequest.java
new file mode 100644
index 0000000000..f0ccd7869e
--- /dev/null
+++ 
b/common/src/main/java/org/apache/gravitino/dto/requests/JobTemplateRegisterRequest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.requests;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.gravitino.dto.job.JobTemplateDTO;
+import org.apache.gravitino.rest.RESTRequest;
+
+/** Represents a request to register a job template. */
+@Getter
+@EqualsAndHashCode
+public class JobTemplateRegisterRequest implements RESTRequest {
+
+  @JsonProperty("jobTemplate")
+  private final JobTemplateDTO jobTemplate;
+
+  /**
+   * Creates a new JobTemplateRegisterRequest.
+   *
+   * @param jobTemplate The job template to register.
+   */
+  public JobTemplateRegisterRequest(JobTemplateDTO jobTemplate) {
+    this.jobTemplate = jobTemplate;
+  }
+
+  /** This is the constructor that is used by Jackson deserializer */
+  private JobTemplateRegisterRequest() {
+    this(null);
+  }
+
+  /**
+   * Validates the request.
+   *
+   * @throws IllegalArgumentException If the request is invalid, this 
exception is thrown.
+   */
+  @Override
+  public void validate() throws IllegalArgumentException {
+    Preconditions.checkArgument(jobTemplate != null, "\"jobTemplate\" must not 
be null");
+    jobTemplate.validate();
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/responses/JobTemplateListResponse.java
 
b/common/src/main/java/org/apache/gravitino/dto/responses/JobTemplateListResponse.java
new file mode 100644
index 0000000000..00a0d7ac8b
--- /dev/null
+++ 
b/common/src/main/java/org/apache/gravitino/dto/responses/JobTemplateListResponse.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.responses;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.gravitino.dto.job.JobTemplateDTO;
+
+/** Represents a response containing a list of job templates. */
+@Getter
+@EqualsAndHashCode(callSuper = true)
+public class JobTemplateListResponse extends BaseResponse {
+
+  @JsonProperty("jobTemplates")
+  private final List<JobTemplateDTO> jobTemplates;
+
+  /**
+   * Creates a new JobTemplateListResponse with the specified list of job 
templates.
+   *
+   * @param jobTemplates The list of job templates to include in the response.
+   */
+  public JobTemplateListResponse(List<JobTemplateDTO> jobTemplates) {
+    super(0);
+    this.jobTemplates = jobTemplates;
+  }
+
+  /** Default constructor for Jackson deserialization. */
+  private JobTemplateListResponse() {
+    this(null);
+  }
+
+  @Override
+  public void validate() throws IllegalArgumentException {
+    super.validate();
+    Preconditions.checkArgument(jobTemplates != null, "jobTemplates must not 
be null");
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/responses/JobTemplateResponse.java
 
b/common/src/main/java/org/apache/gravitino/dto/responses/JobTemplateResponse.java
new file mode 100644
index 0000000000..afa57d0e5f
--- /dev/null
+++ 
b/common/src/main/java/org/apache/gravitino/dto/responses/JobTemplateResponse.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.responses;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.gravitino.dto.job.JobTemplateDTO;
+
+/** Represents a response containing a single job template. */
+@Getter
+@EqualsAndHashCode(callSuper = true)
+public class JobTemplateResponse extends BaseResponse {
+
+  @JsonProperty("jobTemplate")
+  private final JobTemplateDTO jobTemplate;
+
+  /**
+   * Creates a new JobTemplateResponse with the specified job template.
+   *
+   * @param jobTemplate The job template to include in the response.
+   */
+  public JobTemplateResponse(JobTemplateDTO jobTemplate) {
+    super(0);
+    this.jobTemplate = jobTemplate;
+  }
+
+  /** Default constructor for Jackson deserialization. */
+  private JobTemplateResponse() {
+    super();
+    this.jobTemplate = null;
+  }
+
+  @Override
+  public void validate() throws IllegalArgumentException {
+    super.validate();
+    Preconditions.checkArgument(jobTemplate != null, "jobTemplate must not be 
null");
+    jobTemplate.validate();
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java 
b/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java
index e60f5102f1..7c12cf29a8 100644
--- a/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java
+++ b/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java
@@ -51,6 +51,9 @@ import org.apache.gravitino.dto.authorization.UserDTO;
 import org.apache.gravitino.dto.credential.CredentialDTO;
 import org.apache.gravitino.dto.file.FileInfoDTO;
 import org.apache.gravitino.dto.file.FilesetDTO;
+import org.apache.gravitino.dto.job.JobTemplateDTO;
+import org.apache.gravitino.dto.job.ShellJobTemplateDTO;
+import org.apache.gravitino.dto.job.SparkJobTemplateDTO;
 import org.apache.gravitino.dto.messaging.TopicDTO;
 import org.apache.gravitino.dto.model.ModelDTO;
 import org.apache.gravitino.dto.model.ModelVersionDTO;
@@ -83,6 +86,9 @@ import org.apache.gravitino.dto.tag.MetadataObjectDTO;
 import org.apache.gravitino.dto.tag.TagDTO;
 import org.apache.gravitino.file.FileInfo;
 import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.ShellJobTemplate;
+import org.apache.gravitino.job.SparkJobTemplate;
 import org.apache.gravitino.messaging.Topic;
 import org.apache.gravitino.model.Model;
 import org.apache.gravitino.model.ModelVersion;
@@ -1142,4 +1148,44 @@ public class DTOConverters {
       return Privileges.deny(privilegeDTO.name());
     }
   }
+
+  /**
+   * Converts a JobTemplateDTO to a JobTemplate.
+   *
+   * @param jobTemplateDTO The job template DTO to be converted.
+   * @return The job template.
+   */
+  public static JobTemplate fromDTO(JobTemplateDTO jobTemplateDTO) {
+    switch (jobTemplateDTO.jobType()) {
+      case SHELL:
+        return ShellJobTemplate.builder()
+            .withName(jobTemplateDTO.name())
+            .withComment(jobTemplateDTO.comment())
+            .withExecutable(jobTemplateDTO.executable())
+            .withArguments(jobTemplateDTO.arguments())
+            .withEnvironments(jobTemplateDTO.environments())
+            .withCustomFields(jobTemplateDTO.customFields())
+            .withScripts(((ShellJobTemplateDTO) jobTemplateDTO).scripts())
+            .build();
+
+      case SPARK:
+        return SparkJobTemplate.builder()
+            .withName(jobTemplateDTO.name())
+            .withComment(jobTemplateDTO.comment())
+            .withExecutable(jobTemplateDTO.executable())
+            .withArguments(jobTemplateDTO.arguments())
+            .withEnvironments(jobTemplateDTO.environments())
+            .withCustomFields(jobTemplateDTO.customFields())
+            .withClassName(((SparkJobTemplateDTO) jobTemplateDTO).className())
+            .withJars(((SparkJobTemplateDTO) jobTemplateDTO).jars())
+            .withFiles(((SparkJobTemplateDTO) jobTemplateDTO).files())
+            .withArchives(((SparkJobTemplateDTO) jobTemplateDTO).archives())
+            .withConfigs(((SparkJobTemplateDTO) jobTemplateDTO).configs())
+            .build();
+
+      default:
+        throw new IllegalArgumentException(
+            "Unsupported job template type: " + jobTemplateDTO.jobType());
+    }
+  }
 }
diff --git 
a/common/src/test/java/org/apache/gravitino/dto/job/TestJobTemplateDTO.java 
b/common/src/test/java/org/apache/gravitino/dto/job/TestJobTemplateDTO.java
new file mode 100644
index 0000000000..3e85d35c1f
--- /dev/null
+++ b/common/src/test/java/org/apache/gravitino/dto/job/TestJobTemplateDTO.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.job;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.time.Instant;
+import org.apache.gravitino.dto.AuditDTO;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.json.JsonUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestJobTemplateDTO {
+
+  @Test
+  public void testShellJobTemplateDTO() throws JsonProcessingException {
+    JobTemplateDTO shellJobTemplateDTO =
+        ShellJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SHELL)
+            .withName("testShellJob")
+            .withComment("This is a test shell job template")
+            .withExecutable("/path/to/shell")
+            .withArguments(Lists.newArrayList("arg1", "arg2"))
+            .withEnvironments(ImmutableMap.of("ENV_VAR", "value"))
+            .withCustomFields(ImmutableMap.of("customField1", "value1"))
+            .withScripts(Lists.newArrayList("/path/to/script1.sh", 
"/path/to/script2.sh"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(shellJobTemplateDTO::validate);
+
+    String serJson = 
JsonUtils.objectMapper().writeValueAsString(shellJobTemplateDTO);
+    JobTemplateDTO deserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(serJson, JobTemplateDTO.class);
+    Assertions.assertEquals(shellJobTemplateDTO, deserJobTemplateDTO);
+
+    // Test comment is null
+    ShellJobTemplateDTO nullCommentJobTemplateDTO =
+        ShellJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SHELL)
+            .withName("testShellJobNullComment")
+            .withExecutable("/path/to/shell")
+            .withArguments(Lists.newArrayList("arg1", "arg2"))
+            .withEnvironments(ImmutableMap.of("ENV_VAR", "value"))
+            .withCustomFields(ImmutableMap.of("customField1", "value1"))
+            .withScripts(Lists.newArrayList("/path/to/script1.sh", 
"/path/to/script2.sh"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(nullCommentJobTemplateDTO::validate);
+    String nullCommentSerJson =
+        JsonUtils.objectMapper().writeValueAsString(nullCommentJobTemplateDTO);
+    JobTemplateDTO nullCommentDeserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(nullCommentSerJson, 
JobTemplateDTO.class);
+    Assertions.assertEquals(nullCommentJobTemplateDTO, 
nullCommentDeserJobTemplateDTO);
+
+    // Test arguments are null
+    ShellJobTemplateDTO nullArgumentsJobTemplateDTO =
+        ShellJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SHELL)
+            .withName("testShellJobNullArguments")
+            .withExecutable("/path/to/shell")
+            .withEnvironments(ImmutableMap.of("ENV_VAR", "value"))
+            .withCustomFields(ImmutableMap.of("customField1", "value1"))
+            .withScripts(Lists.newArrayList("/path/to/script1.sh", 
"/path/to/script2.sh"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(nullArgumentsJobTemplateDTO::validate);
+    String nullArgumentsSerJson =
+        
JsonUtils.objectMapper().writeValueAsString(nullArgumentsJobTemplateDTO);
+    JobTemplateDTO nullArgumentsDeserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(nullArgumentsSerJson, 
JobTemplateDTO.class);
+    Assertions.assertEquals(nullArgumentsJobTemplateDTO, 
nullArgumentsDeserJobTemplateDTO);
+
+    // Test environments are null
+    ShellJobTemplateDTO nullEnvironmentsJobTemplateDTO =
+        ShellJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SHELL)
+            .withName("testShellJobNullEnvironments")
+            .withExecutable("/path/to/shell")
+            .withCustomFields(ImmutableMap.of("customField1", "value1"))
+            .withScripts(Lists.newArrayList("/path/to/script1.sh", 
"/path/to/script2.sh"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(nullEnvironmentsJobTemplateDTO::validate);
+    String nullEnvironmentsSerJson =
+        
JsonUtils.objectMapper().writeValueAsString(nullEnvironmentsJobTemplateDTO);
+    JobTemplateDTO nullEnvironmentsDeserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(nullEnvironmentsSerJson, 
JobTemplateDTO.class);
+    Assertions.assertEquals(nullEnvironmentsJobTemplateDTO, 
nullEnvironmentsDeserJobTemplateDTO);
+
+    // Test custom fields are null
+    ShellJobTemplateDTO nullCustomFieldsJobTemplateDTO =
+        ShellJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SHELL)
+            .withName("testShellJobNullCustomFields")
+            .withExecutable("/path/to/shell")
+            .withScripts(Lists.newArrayList("/path/to/script1.sh", 
"/path/to/script2.sh"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(nullCustomFieldsJobTemplateDTO::validate);
+    String nullCustomFieldsSerJson =
+        
JsonUtils.objectMapper().writeValueAsString(nullCustomFieldsJobTemplateDTO);
+    JobTemplateDTO nullCustomFieldsDeserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(nullCustomFieldsSerJson, 
JobTemplateDTO.class);
+    Assertions.assertEquals(nullCustomFieldsJobTemplateDTO, 
nullCustomFieldsDeserJobTemplateDTO);
+
+    // Test scripts are null
+    ShellJobTemplateDTO nullScriptsJobTemplateDTO =
+        ShellJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SHELL)
+            .withName("testShellJobNullScripts")
+            .withExecutable("/path/to/shell")
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(nullScriptsJobTemplateDTO::validate);
+    String nullScriptsSerJson =
+        JsonUtils.objectMapper().writeValueAsString(nullScriptsJobTemplateDTO);
+    JobTemplateDTO nullScriptsDeserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(nullScriptsSerJson, 
JobTemplateDTO.class);
+    Assertions.assertEquals(nullScriptsJobTemplateDTO, 
nullScriptsDeserJobTemplateDTO);
+
+    // Test name is null
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          ShellJobTemplateDTO template =
+              ShellJobTemplateDTO.builder()
+                  .withJobType(JobTemplate.JobType.SHELL)
+                  .withExecutable("/path/to/shell")
+                  .withAudit(
+                      
AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+                  .build();
+          template.validate();
+        },
+        "\"name\" is required and cannot be empty");
+
+    // Test executable is null
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          ShellJobTemplateDTO template =
+              ShellJobTemplateDTO.builder()
+                  .withJobType(JobTemplate.JobType.SHELL)
+                  .withName("testShellJobNullExecutable")
+                  .withAudit(
+                      
AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+                  .build();
+          template.validate();
+        },
+        "\"executable\" is required and cannot be empty");
+
+    // Test jobType is null
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          ShellJobTemplateDTO template =
+              ShellJobTemplateDTO.builder()
+                  .withName("testShellJobNullJobType")
+                  .withExecutable("/path/to/shell")
+                  .withAudit(
+                      
AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+                  .build();
+          template.validate();
+        },
+        "\"jobType\" is required and cannot be null");
+  }
+
+  @Test
+  public void testSparkJobTemplateDTO() throws JsonProcessingException {
+    JobTemplateDTO sparkJobTemplateDTO =
+        SparkJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SPARK)
+            .withName("testSparkJob")
+            .withComment("This is a test spark job template")
+            .withExecutable("/path/to/spark-submit")
+            .withArguments(Lists.newArrayList("--class", "com.example.Main"))
+            .withEnvironments(ImmutableMap.of("SPARK_ENV_VAR", "value"))
+            .withCustomFields(ImmutableMap.of("customField1", "value1"))
+            .withClassName("com.example.Main")
+            .withJars(Lists.newArrayList("/path/to/jar1.jar", 
"/path/to/jar2.jar"))
+            .withFiles(Lists.newArrayList("/path/to/file1.txt", 
"/path/to/file2.txt"))
+            .withArchives(Lists.newArrayList("/path/to/archive1.zip", 
"/path/to/archive2.zip"))
+            .withConfigs(ImmutableMap.of("spark.executor.memory", "2g"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(sparkJobTemplateDTO::validate);
+
+    String serJson = 
JsonUtils.objectMapper().writeValueAsString(sparkJobTemplateDTO);
+    JobTemplateDTO deserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(serJson, JobTemplateDTO.class);
+    Assertions.assertEquals(sparkJobTemplateDTO, deserJobTemplateDTO);
+
+    // Test comment is null
+    SparkJobTemplateDTO nullCommentJobTemplateDTO =
+        SparkJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SPARK)
+            .withName("testSparkJobNullComment")
+            .withExecutable("/path/to/spark-submit")
+            .withArguments(Lists.newArrayList("--class", "com.example.Main"))
+            .withEnvironments(ImmutableMap.of("SPARK_ENV_VAR", "value"))
+            .withCustomFields(ImmutableMap.of("customField1", "value1"))
+            .withClassName("com.example.Main")
+            .withJars(Lists.newArrayList("/path/to/jar1.jar", 
"/path/to/jar2.jar"))
+            .withFiles(Lists.newArrayList("/path/to/file1.txt", 
"/path/to/file2.txt"))
+            .withArchives(Lists.newArrayList("/path/to/archive1.zip", 
"/path/to/archive2.zip"))
+            .withConfigs(ImmutableMap.of("spark.executor.memory", "2g"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(nullCommentJobTemplateDTO::validate);
+    String nullCommentSerJson =
+        JsonUtils.objectMapper().writeValueAsString(nullCommentJobTemplateDTO);
+    JobTemplateDTO nullCommentDeserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(nullCommentSerJson, 
JobTemplateDTO.class);
+    Assertions.assertEquals(nullCommentJobTemplateDTO, 
nullCommentDeserJobTemplateDTO);
+
+    // Test arguments are null
+    SparkJobTemplateDTO nullArgumentsJobTemplateDTO =
+        SparkJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SPARK)
+            .withName("testSparkJobNullArguments")
+            .withExecutable("/path/to/spark-submit")
+            .withEnvironments(ImmutableMap.of("SPARK_ENV_VAR", "value"))
+            .withCustomFields(ImmutableMap.of("customField1", "value1"))
+            .withClassName("com.example.Main")
+            .withJars(Lists.newArrayList("/path/to/jar1.jar", 
"/path/to/jar2.jar"))
+            .withFiles(Lists.newArrayList("/path/to/file1.txt", 
"/path/to/file2.txt"))
+            .withArchives(Lists.newArrayList("/path/to/archive1.zip", 
"/path/to/archive2.zip"))
+            .withConfigs(ImmutableMap.of("spark.executor.memory", "2g"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(nullArgumentsJobTemplateDTO::validate);
+    String nullArgumentsSerJson =
+        
JsonUtils.objectMapper().writeValueAsString(nullArgumentsJobTemplateDTO);
+    JobTemplateDTO nullArgumentsDeserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(nullArgumentsSerJson, 
JobTemplateDTO.class);
+    Assertions.assertEquals(nullArgumentsJobTemplateDTO, 
nullArgumentsDeserJobTemplateDTO);
+
+    // Test environments are null
+    SparkJobTemplateDTO nullEnvironmentsJobTemplateDTO =
+        SparkJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SPARK)
+            .withName("testSparkJobNullEnvironments")
+            .withExecutable("/path/to/spark-submit")
+            .withCustomFields(ImmutableMap.of("customField1", "value1"))
+            .withClassName("com.example.Main")
+            .withJars(Lists.newArrayList("/path/to/jar1.jar", 
"/path/to/jar2.jar"))
+            .withFiles(Lists.newArrayList("/path/to/file1.txt", 
"/path/to/file2.txt"))
+            .withArchives(Lists.newArrayList("/path/to/archive1.zip", 
"/path/to/archive2.zip"))
+            .withConfigs(ImmutableMap.of("spark.executor.memory", "2g"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(nullEnvironmentsJobTemplateDTO::validate);
+    String nullEnvironmentsSerJson =
+        
JsonUtils.objectMapper().writeValueAsString(nullEnvironmentsJobTemplateDTO);
+    JobTemplateDTO nullEnvironmentsDeserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(nullEnvironmentsSerJson, 
JobTemplateDTO.class);
+    Assertions.assertEquals(nullEnvironmentsJobTemplateDTO, 
nullEnvironmentsDeserJobTemplateDTO);
+
+    // Test custom fields are null
+    SparkJobTemplateDTO nullCustomFieldsJobTemplateDTO =
+        SparkJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SPARK)
+            .withName("testSparkJobNullCustomFields")
+            .withExecutable("/path/to/spark-submit")
+            .withClassName("com.example.Main")
+            .withJars(Lists.newArrayList("/path/to/jar1.jar", 
"/path/to/jar2.jar"))
+            .withFiles(Lists.newArrayList("/path/to/file1.txt", 
"/path/to/file2.txt"))
+            .withArchives(Lists.newArrayList("/path/to/archive1.zip", 
"/path/to/archive2.zip"))
+            .withConfigs(ImmutableMap.of("spark.executor.memory", "2g"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(nullCustomFieldsJobTemplateDTO::validate);
+    String nullCustomFieldsSerJson =
+        
JsonUtils.objectMapper().writeValueAsString(nullCustomFieldsJobTemplateDTO);
+    JobTemplateDTO nullCustomFieldsDeserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(nullCustomFieldsSerJson, 
JobTemplateDTO.class);
+    Assertions.assertEquals(nullCustomFieldsJobTemplateDTO, 
nullCustomFieldsDeserJobTemplateDTO);
+
+    // Test Jars are null
+    SparkJobTemplateDTO nullJarsJobTemplateDTO =
+        SparkJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SPARK)
+            .withName("testSparkJobNullJars")
+            .withExecutable("/path/to/spark-submit")
+            .withClassName("com.example.Main")
+            .withFiles(Lists.newArrayList("/path/to/file1.txt", 
"/path/to/file2.txt"))
+            .withArchives(Lists.newArrayList("/path/to/archive1.zip", 
"/path/to/archive2.zip"))
+            .withConfigs(ImmutableMap.of("spark.executor.memory", "2g"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(nullJarsJobTemplateDTO::validate);
+    String nullJarsSerJson = 
JsonUtils.objectMapper().writeValueAsString(nullJarsJobTemplateDTO);
+    JobTemplateDTO nullJarsDeserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(nullJarsSerJson, 
JobTemplateDTO.class);
+    Assertions.assertEquals(nullJarsJobTemplateDTO, 
nullJarsDeserJobTemplateDTO);
+
+    // Test files are null
+    SparkJobTemplateDTO nullFilesJobTemplateDTO =
+        SparkJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SPARK)
+            .withName("testSparkJobNullFiles")
+            .withExecutable("/path/to/spark-submit")
+            .withClassName("com.example.Main")
+            .withArchives(Lists.newArrayList("/path/to/archive1.zip", 
"/path/to/archive2.zip"))
+            .withConfigs(ImmutableMap.of("spark.executor.memory", "2g"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(nullFilesJobTemplateDTO::validate);
+    String nullFilesSerJson = 
JsonUtils.objectMapper().writeValueAsString(nullFilesJobTemplateDTO);
+    JobTemplateDTO nullFilesDeserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(nullFilesSerJson, 
JobTemplateDTO.class);
+    Assertions.assertEquals(nullFilesJobTemplateDTO, 
nullFilesDeserJobTemplateDTO);
+
+    // Test archives are null
+    SparkJobTemplateDTO nullArchivesJobTemplateDTO =
+        SparkJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SPARK)
+            .withName("testSparkJobNullArchives")
+            .withExecutable("/path/to/spark-submit")
+            .withClassName("com.example.Main")
+            .withConfigs(ImmutableMap.of("spark.executor.memory", "2g"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(nullArchivesJobTemplateDTO::validate);
+    String nullArchivesSerJson =
+        
JsonUtils.objectMapper().writeValueAsString(nullArchivesJobTemplateDTO);
+    JobTemplateDTO nullArchivesDeserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(nullArchivesSerJson, 
JobTemplateDTO.class);
+    Assertions.assertEquals(nullArchivesJobTemplateDTO, 
nullArchivesDeserJobTemplateDTO);
+
+    // Test configs are null
+    SparkJobTemplateDTO nullConfigsJobTemplateDTO =
+        SparkJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SPARK)
+            .withName("testSparkJobNullConfigs")
+            .withExecutable("/path/to/spark-submit")
+            .withClassName("com.example.Main")
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    Assertions.assertDoesNotThrow(nullConfigsJobTemplateDTO::validate);
+    String nullConfigsSerJson =
+        JsonUtils.objectMapper().writeValueAsString(nullConfigsJobTemplateDTO);
+    JobTemplateDTO nullConfigsDeserJobTemplateDTO =
+        JsonUtils.objectMapper().readValue(nullConfigsSerJson, 
JobTemplateDTO.class);
+    Assertions.assertEquals(nullConfigsJobTemplateDTO, 
nullConfigsDeserJobTemplateDTO);
+
+    // Test name is null
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          SparkJobTemplateDTO template =
+              SparkJobTemplateDTO.builder()
+                  .withJobType(JobTemplate.JobType.SPARK)
+                  .withExecutable("/path/to/spark-submit")
+                  .withAudit(
+                      
AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+                  .build();
+          template.validate();
+        },
+        "\"name\" is required and cannot be empty");
+
+    // Test executable is null
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          SparkJobTemplateDTO template =
+              SparkJobTemplateDTO.builder()
+                  .withJobType(JobTemplate.JobType.SPARK)
+                  .withName("testSparkJobNullExecutable")
+                  .withAudit(
+                      
AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+                  .build();
+          template.validate();
+        },
+        "\"executable\" is required and cannot be empty");
+
+    // Test jobType is null
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          SparkJobTemplateDTO template =
+              SparkJobTemplateDTO.builder()
+                  .withName("testSparkJobNullJobType")
+                  .withExecutable("/path/to/spark-submit")
+                  .withAudit(
+                      
AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+                  .build();
+          template.validate();
+        },
+        "\"jobType\" is required and cannot be null");
+
+    // Test className is null
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          SparkJobTemplateDTO template =
+              SparkJobTemplateDTO.builder()
+                  .withJobType(JobTemplate.JobType.SPARK)
+                  .withName("testSparkJobNullClassName")
+                  .withExecutable("/path/to/spark-submit")
+                  .withAudit(
+                      
AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+                  .build();
+          template.validate();
+        },
+        "\"className\" is required and cannot be empty");
+  }
+}
diff --git 
a/common/src/test/java/org/apache/gravitino/dto/requests/TestJobTemplateRegisterRequest.java
 
b/common/src/test/java/org/apache/gravitino/dto/requests/TestJobTemplateRegisterRequest.java
new file mode 100644
index 0000000000..d8758badc6
--- /dev/null
+++ 
b/common/src/test/java/org/apache/gravitino/dto/requests/TestJobTemplateRegisterRequest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.requests;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.time.Instant;
+import org.apache.gravitino.dto.AuditDTO;
+import org.apache.gravitino.dto.job.JobTemplateDTO;
+import org.apache.gravitino.dto.job.ShellJobTemplateDTO;
+import org.apache.gravitino.dto.job.SparkJobTemplateDTO;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.json.JsonUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestJobTemplateRegisterRequest {
+
+  @Test
+  public void testJobTemplateRegisterRequestSerDe() throws 
JsonProcessingException {
+    JobTemplateDTO shellJobTemplateDTO =
+        ShellJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SHELL)
+            .withName("testShellJob")
+            .withComment("This is a test shell job template")
+            .withExecutable("/path/to/shell")
+            .withArguments(Lists.newArrayList("arg1", "arg2"))
+            .withEnvironments(ImmutableMap.of("ENV_VAR", "value"))
+            .withCustomFields(ImmutableMap.of("customField1", "value1"))
+            .withScripts(Lists.newArrayList("/path/to/script1.sh", 
"/path/to/script2.sh"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    JobTemplateRegisterRequest request = new 
JobTemplateRegisterRequest(shellJobTemplateDTO);
+
+    String serJson = JsonUtils.objectMapper().writeValueAsString(request);
+    JobTemplateRegisterRequest deserRequest =
+        JsonUtils.objectMapper().readValue(serJson, 
JobTemplateRegisterRequest.class);
+    Assertions.assertEquals(shellJobTemplateDTO, 
deserRequest.getJobTemplate());
+
+    JobTemplateDTO sparkJobTemplateDTO =
+        SparkJobTemplateDTO.builder()
+            .withJobType(JobTemplate.JobType.SPARK)
+            .withName("testSparkJob")
+            .withComment("This is a test spark job template")
+            .withExecutable("/path/to/spark-submit")
+            .withArguments(Lists.newArrayList("--class", "com.example.Main"))
+            .withEnvironments(ImmutableMap.of("SPARK_ENV_VAR", "value"))
+            .withCustomFields(ImmutableMap.of("customField1", "value1"))
+            .withClassName("com.example.Main")
+            .withJars(Lists.newArrayList("/path/to/jar1.jar", 
"/path/to/jar2.jar"))
+            .withFiles(Lists.newArrayList("/path/to/file1.txt", 
"/path/to/file2.txt"))
+            .withArchives(Lists.newArrayList("/path/to/archive1.zip", 
"/path/to/archive2.zip"))
+            .withConfigs(ImmutableMap.of("spark.executor.memory", "2g"))
+            
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    JobTemplateRegisterRequest sparkRequest = new 
JobTemplateRegisterRequest(sparkJobTemplateDTO);
+
+    String sparkSerJson = 
JsonUtils.objectMapper().writeValueAsString(sparkRequest);
+    JobTemplateRegisterRequest sparkDeserRequest =
+        JsonUtils.objectMapper().readValue(sparkSerJson, 
JobTemplateRegisterRequest.class);
+
+    Assertions.assertEquals(sparkJobTemplateDTO, 
sparkDeserRequest.getJobTemplate());
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index 4651fada19..cdc90ee63d 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -58,6 +58,8 @@ import org.apache.gravitino.hook.ModelHookDispatcher;
 import org.apache.gravitino.hook.SchemaHookDispatcher;
 import org.apache.gravitino.hook.TableHookDispatcher;
 import org.apache.gravitino.hook.TopicHookDispatcher;
+import org.apache.gravitino.job.JobManager;
+import org.apache.gravitino.job.JobOperationDispatcher;
 import org.apache.gravitino.listener.CatalogEventDispatcher;
 import org.apache.gravitino.listener.EventBus;
 import org.apache.gravitino.listener.EventListenerManager;
@@ -138,6 +140,8 @@ public class GravitinoEnv {
 
   private AuditLogManager auditLogManager;
 
+  private JobOperationDispatcher jobOperationDispatcher;
+
   private EventBus eventBus;
   private OwnerDispatcher ownerDispatcher;
   private FutureGrantManager futureGrantManager;
@@ -346,9 +350,9 @@ public class GravitinoEnv {
   }
 
   /**
-   * Get the AuditLogManager associated with the Gravitino environment.
+   * Get the PolicyDispatcher associated with the Gravitino environment.
    *
-   * @return The AuditLogManager instance.
+   * @return The PolicyDispatcher instance.
    */
   public PolicyDispatcher policyDispatcher() {
     return policyDispatcher;
@@ -399,6 +403,16 @@ public class GravitinoEnv {
     return gravitinoAuthorizer;
   }
 
+  /**
+   * Get the JobOperationDispatcher associated with the Gravitino environment.
+   *
+   * @return The JobOperationDispatcher instance.
+   */
+  public JobOperationDispatcher jobOperationDispatcher() {
+    Preconditions.checkArgument(jobOperationDispatcher != null, "GravitinoEnv 
is not initialized.");
+    return jobOperationDispatcher;
+  }
+
   public void start() {
     metricsSystem.start();
     eventListenerManager.start();
@@ -443,6 +457,15 @@ public class GravitinoEnv {
       metalakeManager.close();
     }
 
+    if (jobOperationDispatcher != null) {
+      try {
+        jobOperationDispatcher.close();
+        jobOperationDispatcher = null;
+      } catch (Exception e) {
+        LOG.warn("Failed to close JobOperationDispatcher", e);
+      }
+    }
+
     LOG.info("Gravitino Environment is shut down.");
   }
 
@@ -560,5 +583,8 @@ public class GravitinoEnv {
     this.tagDispatcher = new TagEventDispatcher(eventBus, new 
TagManager(idGenerator, entityStore));
     // todo: support policy event dispatcher
     this.policyDispatcher = new PolicyManager(idGenerator, entityStore);
+
+    // TODO: Support event for job operation dispatcher
+    this.jobOperationDispatcher = new JobManager(config, entityStore, 
idGenerator);
   }
 }
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java 
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index 8bb193a7a3..4ecb5066b9 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -22,7 +22,6 @@ package org.apache.gravitino.job;
 import static org.apache.gravitino.metalake.MetalakeManager.checkMetalake;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -60,7 +59,7 @@ import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
 import org.apache.gravitino.utils.PrincipalUtils;
 
-public class JobManager implements JobOperationDispatcher, Closeable {
+public class JobManager implements JobOperationDispatcher {
 
   private static final Pattern PLACEHOLDER_PATTERN = 
Pattern.compile("\\{\\{([\\w.-]+)\\}\\}");
 
diff --git 
a/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
index 0c83862f4c..464b430847 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.job;
 
+import java.io.Closeable;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -30,7 +31,7 @@ import org.apache.gravitino.meta.JobEntity;
 import org.apache.gravitino.meta.JobTemplateEntity;
 
 /** The interface for job operation dispatcher. */
-public interface JobOperationDispatcher {
+public interface JobOperationDispatcher extends Closeable {
 
   /**
    * Lists all the job templates in the specified metalake.
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
index 56e667d274..ec9f851d74 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
@@ -40,7 +40,7 @@ public class JobMetaPostgreSQLProvider extends 
JobMetaBaseSQLProvider {
         + " WHERE job_template_name = #{jobMeta.jobTemplateName}"
         + " AND metalake_id = #{jobMeta.metalakeId} AND deleted_at = 0),"
         + " #{jobMeta.metalakeId}, #{jobMeta.jobExecutionId},"
-        + " #{jobMeta.jobRunStatus}, #{jobMeta.jobFinished}, 
#{jobMeta.auditInfo},"
+        + " #{jobMeta.jobRunStatus}, #{jobMeta.jobFinishedAt}, 
#{jobMeta.auditInfo},"
         + " #{jobMeta.currentVersion}, #{jobMeta.lastVersion},"
         + " #{jobMeta.deletedAt})"
         + " ON CONFLICT (job_run_id) DO UPDATE SET"
diff --git 
a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java 
b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
index 4262446fe6..da76701444 100644
--- a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
+++ b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
@@ -34,6 +34,7 @@ import org.apache.gravitino.catalog.SchemaDispatcher;
 import org.apache.gravitino.catalog.TableDispatcher;
 import org.apache.gravitino.catalog.TopicDispatcher;
 import org.apache.gravitino.credential.CredentialOperationDispatcher;
+import org.apache.gravitino.job.JobOperationDispatcher;
 import org.apache.gravitino.lineage.LineageConfig;
 import org.apache.gravitino.lineage.LineageDispatcher;
 import org.apache.gravitino.lineage.LineageService;
@@ -143,6 +144,7 @@ public class GravitinoServer extends ResourceConfig {
                 .ranked(1);
             
bind(gravitinoEnv.modelDispatcher()).to(ModelDispatcher.class).ranked(1);
             bind(lineageService).to(LineageDispatcher.class).ranked(1);
+            
bind(gravitinoEnv.jobOperationDispatcher()).to(JobOperationDispatcher.class).ranked(1);
           }
         });
     register(JsonProcessingExceptionMapper.class);
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
index b71219b045..04025fb902 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
@@ -29,6 +29,7 @@ import 
org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.ForbiddenException;
 import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
 import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
 import org.apache.gravitino.exceptions.MetalakeAlreadyExistsException;
 import org.apache.gravitino.exceptions.MetalakeInUseException;
 import org.apache.gravitino.exceptions.MetalakeNotInUseException;
@@ -133,6 +134,11 @@ public class ExceptionHandlers {
     return ModelExceptionHandler.INSTANCE.handle(op, model, schema, e);
   }
 
+  public static Response handleJobTemplateException(
+      OperationType op, String jobTemplate, String metalake, Exception e) {
+    return JobTemplateExceptionHandler.INSTANCE.handle(op, jobTemplate, 
metalake, e);
+  }
+
   public static Response handleTestConnectionException(Exception e) {
     ErrorResponse response;
     if (e instanceof IllegalArgumentException) {
@@ -774,6 +780,47 @@ public class ExceptionHandlers {
     }
   }
 
+  private static class JobTemplateExceptionHandler extends 
BaseExceptionHandler {
+
+    private static final ExceptionHandler INSTANCE = new 
JobTemplateExceptionHandler();
+
+    private static String getJobTemplateErrorMsg(
+        String jobTemplate, String operation, String parent, String reason) {
+      return String.format(
+          "Failed to operate job template(s)%s operation [%s] under object 
[%s], reason [%s]",
+          jobTemplate, operation, parent, reason);
+    }
+
+    @Override
+    public Response handle(OperationType op, String jobTemplate, String 
parent, Exception e) {
+      String formatted = StringUtil.isBlank(jobTemplate) ? "" : " [" + 
jobTemplate + "]";
+      String errorMsg = getJobTemplateErrorMsg(formatted, op.name(), parent, 
getErrorMsg(e));
+      LOG.warn(errorMsg, e);
+
+      if (e instanceof IllegalArgumentException) {
+        return Utils.illegalArguments(errorMsg, e);
+
+      } else if (e instanceof NotFoundException) {
+        return Utils.notFound(errorMsg, e);
+
+      } else if (e instanceof JobTemplateAlreadyExistsException) {
+        return Utils.alreadyExists(errorMsg, e);
+
+      } else if (e instanceof InUseException) {
+        return Utils.inUse(errorMsg, e);
+
+      } else if (e instanceof NotInUseException) {
+        return Utils.notInUse(errorMsg, e);
+
+      } else if (e instanceof ForbiddenException) {
+        return Utils.forbidden(errorMsg, e);
+
+      } else {
+        return super.handle(op, jobTemplate, parent, e);
+      }
+    }
+  }
+
   @VisibleForTesting
   static class BaseExceptionHandler extends ExceptionHandler {
 
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java 
b/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
new file mode 100644
index 0000000000..daf120c643
--- /dev/null
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.web.rest;
+
+import com.codahale.metrics.annotation.ResponseMetered;
+import com.codahale.metrics.annotation.Timed;
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Instant;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.dto.job.JobTemplateDTO;
+import org.apache.gravitino.dto.job.ShellJobTemplateDTO;
+import org.apache.gravitino.dto.job.SparkJobTemplateDTO;
+import org.apache.gravitino.dto.requests.JobTemplateRegisterRequest;
+import org.apache.gravitino.dto.responses.BaseResponse;
+import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.JobTemplateListResponse;
+import org.apache.gravitino.dto.responses.JobTemplateResponse;
+import org.apache.gravitino.dto.responses.NameListResponse;
+import org.apache.gravitino.dto.util.DTOConverters;
+import org.apache.gravitino.job.JobOperationDispatcher;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.metrics.MetricNames;
+import org.apache.gravitino.server.web.Utils;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Path("metalakes/{metalake}/jobs")
+public class JobOperations {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobOperations.class);
+
+  private final JobOperationDispatcher jobOperationDispatcher;
+
+  @Context HttpServletRequest httpRequest;
+
+  @Inject
+  public JobOperations(JobOperationDispatcher jobOperationDispatcher) {
+    this.jobOperationDispatcher = jobOperationDispatcher;
+  }
+
+  @GET
+  @Path("templates")
+  @Produces("application/vnd.gravitino.v1+json")
+  @Timed(name = "list-job-templates." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "list-job-templates", absolute = true)
+  public Response listJobTemplates(
+      @PathParam("metalake") String metalake,
+      @QueryParam("details") @DefaultValue("false") boolean details) {
+    LOG.info(
+        "Received request to list job templates in metalake: {}, details: {}", 
metalake, details);
+
+    try {
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            List<JobTemplateDTO> jobTemplates =
+                toDTOs(jobOperationDispatcher.listJobTemplates(metalake));
+            if (details) {
+              LOG.info("List {} job templates in metalake: {}", 
jobTemplates.size(), metalake);
+              return Utils.ok(new JobTemplateListResponse(jobTemplates));
+
+            } else {
+              String[] jobTemplateNames =
+                  
jobTemplates.stream().map(JobTemplateDTO::name).toArray(String[]::new);
+
+              LOG.info(
+                  "List {} job template names in metalake: {}", 
jobTemplateNames.length, metalake);
+              return Utils.ok(new NameListResponse(jobTemplateNames));
+            }
+          });
+
+    } catch (Exception e) {
+      return ExceptionHandlers.handleJobTemplateException(OperationType.LIST, 
"", metalake, e);
+    }
+  }
+
+  @POST
+  @Path("templates")
+  @Produces("application/vnd.gravitino.v1+json")
+  @Timed(name = "register-job-template." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "register-job-template", absolute = true)
+  public Response registerJobTemplate(
+      @PathParam("metalake") String metalake, JobTemplateRegisterRequest 
request) {
+    LOG.info("Received request to register job template in metalake: {}", 
metalake);
+
+    try {
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            request.validate();
+
+            jobOperationDispatcher.registerJobTemplate(
+                metalake, toEntity(metalake, request.getJobTemplate()));
+
+            LOG.info(
+                "Registered job template {} in metalake: {}",
+                request.getJobTemplate().name(),
+                metalake);
+            return Utils.ok(new BaseResponse());
+          });
+
+    } catch (Exception e) {
+      return ExceptionHandlers.handleJobTemplateException(
+          OperationType.REGISTER, request.getJobTemplate().name(), metalake, 
e);
+    }
+  }
+
+  @GET
+  @Path("templates/{name}")
+  @Produces("application/vnd.gravitino.v1+json")
+  @Timed(name = "get-job-template." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "get-job-template", absolute = true)
+  public Response getJobTemplate(
+      @PathParam("metalake") String metalake, @PathParam("name") String name) {
+    LOG.info("Received request to get job template: {} in metalake: {}", name, 
metalake);
+
+    try {
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            JobTemplateEntity jobTemplateEntity =
+                jobOperationDispatcher.getJobTemplate(metalake, name);
+
+            LOG.info("Retrieved job template {} in metalake: {}", name, 
metalake);
+            return Utils.ok(new JobTemplateResponse(toDTO(jobTemplateEntity)));
+          });
+
+    } catch (Exception e) {
+      return ExceptionHandlers.handleJobTemplateException(OperationType.GET, 
name, metalake, e);
+    }
+  }
+
+  @DELETE
+  @Path("templates/{name}")
+  @Produces("application/vnd.gravitino.v1+json")
+  @Timed(name = "delete-job-template." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "delete-job-template", absolute = true)
+  public Response deleteJobTemplate(
+      @PathParam("metalake") String metalake, @PathParam("name") String name) {
+    LOG.info("Received request to delete job template: {} in metalake: {}", 
name, metalake);
+
+    try {
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            boolean deleted = 
jobOperationDispatcher.deleteJobTemplate(metalake, name);
+            if (!deleted) {
+              LOG.warn("Cannot find job template {} in metalake {}", name, 
metalake);
+            } else {
+              LOG.info("Deleted job template {} in metalake {}", name, 
metalake);
+            }
+
+            return Utils.ok(new DropResponse(deleted));
+          });
+
+    } catch (Exception e) {
+      return 
ExceptionHandlers.handleJobTemplateException(OperationType.DELETE, name, 
metalake, e);
+    }
+  }
+
+  private static List<JobTemplateDTO> toDTOs(List<JobTemplateEntity> 
jobTemplateEntities) {
+    return 
jobTemplateEntities.stream().map(JobOperations::toDTO).collect(Collectors.toList());
+  }
+
+  @VisibleForTesting
+  static JobTemplateDTO toDTO(JobTemplateEntity jobTemplateEntity) {
+    switch (jobTemplateEntity.templateContent().jobType()) {
+      case SHELL:
+        return ShellJobTemplateDTO.builder()
+            .withName(jobTemplateEntity.name())
+            .withComment(jobTemplateEntity.comment())
+            .withJobType(jobTemplateEntity.templateContent().jobType())
+            .withExecutable(jobTemplateEntity.templateContent().executable())
+            .withArguments(jobTemplateEntity.templateContent().arguments())
+            
.withEnvironments(jobTemplateEntity.templateContent().environments())
+            
.withCustomFields(jobTemplateEntity.templateContent().customFields())
+            .withScripts(jobTemplateEntity.templateContent().scripts())
+            .withAudit(DTOConverters.toDTO(jobTemplateEntity.auditInfo()))
+            .build();
+
+      case SPARK:
+        return SparkJobTemplateDTO.builder()
+            .withName(jobTemplateEntity.name())
+            .withComment(jobTemplateEntity.comment())
+            .withJobType(jobTemplateEntity.templateContent().jobType())
+            .withExecutable(jobTemplateEntity.templateContent().executable())
+            .withArguments(jobTemplateEntity.templateContent().arguments())
+            
.withEnvironments(jobTemplateEntity.templateContent().environments())
+            
.withCustomFields(jobTemplateEntity.templateContent().customFields())
+            .withClassName(jobTemplateEntity.templateContent().className())
+            .withJars(jobTemplateEntity.templateContent().jars())
+            .withFiles(jobTemplateEntity.templateContent().files())
+            .withArchives(jobTemplateEntity.templateContent().archives())
+            .withConfigs(jobTemplateEntity.templateContent().configs())
+            .withAudit(DTOConverters.toDTO(jobTemplateEntity.auditInfo()))
+            .build();
+
+      default:
+        throw new IllegalArgumentException(
+            "Unsupported job template type: " + 
jobTemplateEntity.templateContent().jobType());
+    }
+  }
+
+  private static JobTemplateEntity toEntity(String metalake, JobTemplateDTO 
jobTemplateDTO) {
+    return JobTemplateEntity.builder()
+        .withId(GravitinoEnv.getInstance().idGenerator().nextId())
+        .withName(jobTemplateDTO.name())
+        .withNamespace(NamespaceUtil.ofJobTemplate(metalake))
+        .withComment(jobTemplateDTO.comment())
+        .withTemplateContent(
+            JobTemplateEntity.TemplateContent.fromJobTemplate(
+                DTOConverters.fromDTO(jobTemplateDTO)))
+        .withAuditInfo(
+            AuditInfo.builder()
+                .withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                .withCreateTime(Instant.now())
+                .build())
+        .build();
+  }
+}
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
new file mode 100644
index 0000000000..a374edf21d
--- /dev/null
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.server.web.rest;
+
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Random;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.dto.job.JobTemplateDTO;
+import org.apache.gravitino.dto.requests.JobTemplateRegisterRequest;
+import org.apache.gravitino.dto.responses.BaseResponse;
+import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.ErrorConstants;
+import org.apache.gravitino.dto.responses.ErrorResponse;
+import org.apache.gravitino.dto.responses.JobTemplateListResponse;
+import org.apache.gravitino.dto.responses.JobTemplateResponse;
+import org.apache.gravitino.dto.responses.NameListResponse;
+import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.MetalakeNotInUseException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.job.JobOperationDispatcher;
+import org.apache.gravitino.job.ShellJobTemplate;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.rest.RESTUtils;
+import org.apache.gravitino.server.web.ObjectMapperProvider;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.glassfish.jersey.internal.inject.AbstractBinder;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.glassfish.jersey.test.TestProperties;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class TestJobOperations extends JerseyTest {
+
+  private static class MockServletRequestFactory extends 
ServletRequestFactoryBase {
+
+    @Override
+    public HttpServletRequest get() {
+      HttpServletRequest request = mock(HttpServletRequest.class);
+      when(request.getRemoteUser()).thenReturn(null);
+      return request;
+    }
+  }
+
+  private final JobOperationDispatcher jobOperationDispatcher = 
mock(JobOperationDispatcher.class);
+
+  private final String metalake = "test_metalake";
+
+  private final AuditInfo auditInfo =
+      
AuditInfo.builder().withCreator("test_user").withCreateTime(Instant.now()).build();
+
+  @Override
+  protected Application configure() {
+    try {
+      forceSet(
+          TestProperties.CONTAINER_PORT, 
String.valueOf(RESTUtils.findAvailablePort(2000, 3000)));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    ResourceConfig resourceConfig = new ResourceConfig();
+    resourceConfig.register(JobOperations.class);
+    resourceConfig.register(ObjectMapperProvider.class);
+    resourceConfig.register(
+        new AbstractBinder() {
+          @Override
+          protected void configure() {
+            
bind(jobOperationDispatcher).to(JobOperationDispatcher.class).ranked(2);
+            bindFactory(TestJobOperations.MockServletRequestFactory.class)
+                .to(HttpServletRequest.class);
+          }
+        });
+
+    return resourceConfig;
+  }
+
+  @BeforeAll
+  public static void setup() throws IllegalAccessException {
+    IdGenerator idGenerator = new RandomIdGenerator();
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "idGenerator", 
idGenerator, true);
+  }
+
+  @Test
+  public void testListJobTemplates() {
+    JobTemplateEntity template1 =
+        newShellJobTemplateEntity("shell_template_1", "Test Shell Template 1");
+    JobTemplateEntity template2 =
+        newSparkJobTemplateEntity("spark_template_1", "Test Spark Template 1");
+
+    when(jobOperationDispatcher.listJobTemplates(metalake))
+        .thenReturn(Lists.newArrayList(template1, template2));
+
+    Response resp =
+        target(jobTemplatePath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(APPLICATION_JSON_TYPE, resp.getMediaType());
+
+    NameListResponse nameListResponse = 
resp.readEntity(NameListResponse.class);
+    Assertions.assertEquals(0, nameListResponse.getCode());
+    String[] expectedNames = {template1.name(), template2.name()};
+    Assertions.assertArrayEquals(expectedNames, nameListResponse.getNames());
+
+    // Test list details
+    Response resp1 =
+        target(jobTemplatePath())
+            .queryParam("details", "true")
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp1.getStatus());
+    Assertions.assertEquals(APPLICATION_JSON_TYPE, resp1.getMediaType());
+
+    JobTemplateListResponse jobTemplateListResponse =
+        resp1.readEntity(JobTemplateListResponse.class);
+    Assertions.assertEquals(0, jobTemplateListResponse.getCode());
+
+    Assertions.assertEquals(2, 
jobTemplateListResponse.getJobTemplates().size());
+    Assertions.assertEquals(
+        JobOperations.toDTO(template1), 
jobTemplateListResponse.getJobTemplates().get(0));
+    Assertions.assertEquals(
+        JobOperations.toDTO(template2), 
jobTemplateListResponse.getJobTemplates().get(1));
+
+    // Test throw NoSuchMetalakeException
+    doThrow(new NoSuchMetalakeException("mock error"))
+        .when(jobOperationDispatcher)
+        .listJobTemplates(metalake);
+
+    Response resp2 =
+        target(jobTemplatePath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
resp2.getStatus());
+
+    ErrorResponse errorResp = resp2.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE, 
errorResp.getCode());
+    Assertions.assertEquals(NoSuchMetalakeException.class.getSimpleName(), 
errorResp.getType());
+
+    // Test throw MetalakeNotInUseException
+    doThrow(new MetalakeNotInUseException("mock error"))
+        .when(jobOperationDispatcher)
+        .listJobTemplates(metalake);
+
+    Response resp3 =
+        target(jobTemplatePath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.CONFLICT.getStatusCode(), 
resp3.getStatus());
+
+    ErrorResponse errorResp2 = resp3.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_IN_USE_CODE, 
errorResp2.getCode());
+    Assertions.assertEquals(MetalakeNotInUseException.class.getSimpleName(), 
errorResp2.getType());
+
+    // Test throw RuntimeException
+    doThrow(new RuntimeException("mock error"))
+        .when(jobOperationDispatcher)
+        .listJobTemplates(metalake);
+
+    Response resp4 =
+        target(jobTemplatePath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp4.getStatus());
+    ErrorResponse errorResp3 = resp4.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResp3.getCode());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp3.getType());
+  }
+
+  @Test
+  public void testRegisterJobTemplate() {
+    JobTemplateEntity template =
+        newShellJobTemplateEntity("shell_template_1", "Test Shell Template 1");
+    JobTemplateDTO templateDTO = JobOperations.toDTO(template);
+    JobTemplateRegisterRequest request = new 
JobTemplateRegisterRequest(templateDTO);
+
+    doNothing().when(jobOperationDispatcher).registerJobTemplate(metalake, 
template);
+
+    Response resp =
+        target(jobTemplatePath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .post(Entity.json(request));
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(APPLICATION_JSON_TYPE, resp.getMediaType());
+
+    BaseResponse baseResp = resp.readEntity(BaseResponse.class);
+    Assertions.assertEquals(0, baseResp.getCode());
+
+    // Test throw NoSuchMetalakeException
+    doThrow(new NoSuchMetalakeException("mock error"))
+        .when(jobOperationDispatcher)
+        .registerJobTemplate(any(), any());
+
+    Response resp2 =
+        target(jobTemplatePath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .post(Entity.entity(request, APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
resp2.getStatus());
+
+    ErrorResponse errorResp = resp2.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE, 
errorResp.getCode());
+    Assertions.assertEquals(NoSuchMetalakeException.class.getSimpleName(), 
errorResp.getType());
+
+    // Test throw MetalakeNotInUseException
+    doThrow(new MetalakeNotInUseException("mock error"))
+        .when(jobOperationDispatcher)
+        .registerJobTemplate(any(), any());
+
+    Response resp3 =
+        target(jobTemplatePath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .post(Entity.entity(request, APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.CONFLICT.getStatusCode(), 
resp3.getStatus());
+
+    ErrorResponse errorResp2 = resp3.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_IN_USE_CODE, 
errorResp2.getCode());
+    Assertions.assertEquals(MetalakeNotInUseException.class.getSimpleName(), 
errorResp2.getType());
+
+    // Test throw JobTemplateAlreadyExistsException
+    doThrow(new JobTemplateAlreadyExistsException("mock error"))
+        .when(jobOperationDispatcher)
+        .registerJobTemplate(any(), any());
+    Response resp4 =
+        target(jobTemplatePath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .post(Entity.entity(request, APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.CONFLICT.getStatusCode(), 
resp4.getStatus());
+    ErrorResponse errorResp3 = resp4.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.ALREADY_EXISTS_CODE, 
errorResp3.getCode());
+    Assertions.assertEquals(
+        JobTemplateAlreadyExistsException.class.getSimpleName(), 
errorResp3.getType());
+
+    // Test throw RuntimeException
+    doThrow(new RuntimeException("mock error"))
+        .when(jobOperationDispatcher)
+        .registerJobTemplate(any(), any());
+
+    Response resp5 =
+        target(jobTemplatePath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .post(Entity.entity(request, APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp5.getStatus());
+
+    ErrorResponse errorResp4 = resp5.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResp4.getCode());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp4.getType());
+  }
+
+  @Test
+  public void testGetJobTemplate() {
+    JobTemplateEntity template =
+        newShellJobTemplateEntity("shell_template_1", "Test Shell Template 1");
+
+    when(jobOperationDispatcher.getJobTemplate(metalake, 
template.name())).thenReturn(template);
+
+    Response resp =
+        target(jobTemplatePath())
+            .path(template.name())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(APPLICATION_JSON_TYPE, resp.getMediaType());
+
+    JobTemplateResponse jobTemplateResp = 
resp.readEntity(JobTemplateResponse.class);
+    Assertions.assertEquals(JobOperations.toDTO(template), 
jobTemplateResp.getJobTemplate());
+
+    // Test throw NoSuchMetalakeException
+    doThrow(new NoSuchMetalakeException("mock error"))
+        .when(jobOperationDispatcher)
+        .getJobTemplate(any(), any());
+
+    Response resp2 =
+        target(jobTemplatePath())
+            .path(template.name())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
resp2.getStatus());
+
+    ErrorResponse errorResp = resp2.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE, 
errorResp.getCode());
+    Assertions.assertEquals(NoSuchMetalakeException.class.getSimpleName(), 
errorResp.getType());
+
+    // Test throw MetalakeNotInUseException
+    doThrow(new MetalakeNotInUseException("mock error"))
+        .when(jobOperationDispatcher)
+        .getJobTemplate(any(), any());
+
+    Response resp3 =
+        target(jobTemplatePath())
+            .path(template.name())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.CONFLICT.getStatusCode(), 
resp3.getStatus());
+
+    ErrorResponse errorResp2 = resp3.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_IN_USE_CODE, 
errorResp2.getCode());
+    Assertions.assertEquals(MetalakeNotInUseException.class.getSimpleName(), 
errorResp2.getType());
+
+    // Test throw RuntimeException
+    doThrow(new RuntimeException("mock error"))
+        .when(jobOperationDispatcher)
+        .getJobTemplate(any(), any());
+
+    Response resp4 =
+        target(jobTemplatePath())
+            .path(template.name())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp4.getStatus());
+    ErrorResponse errorResp3 = resp4.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResp3.getCode());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp3.getType());
+
+    // Test throw NoSuchJobTemplateException
+    doThrow(new NoSuchJobTemplateException("mock error"))
+        .when(jobOperationDispatcher)
+        .getJobTemplate(any(), any());
+
+    Response resp5 =
+        target(jobTemplatePath())
+            .path(template.name())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
resp5.getStatus());
+    ErrorResponse errorResp4 = resp5.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE, 
errorResp4.getCode());
+    Assertions.assertEquals(NoSuchJobTemplateException.class.getSimpleName(), 
errorResp4.getType());
+  }
+
+  @Test
+  public void testDeleteJobTemplate() {
+    String templateName = "shell_template_1";
+
+    when(jobOperationDispatcher.deleteJobTemplate(metalake, 
templateName)).thenReturn(true);
+
+    Response resp =
+        target(jobTemplatePath())
+            .path(templateName)
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .delete();
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(APPLICATION_JSON_TYPE, resp.getMediaType());
+
+    DropResponse dropResp = resp.readEntity(DropResponse.class);
+    Assertions.assertEquals(0, dropResp.getCode());
+
+    Assertions.assertTrue(dropResp.dropped());
+
+    // Test throw NoSuchMetalakeException
+    doThrow(new NoSuchMetalakeException("mock error"))
+        .when(jobOperationDispatcher)
+        .deleteJobTemplate(any(), any());
+
+    Response resp2 =
+        target(jobTemplatePath())
+            .path(templateName)
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .delete();
+
+    Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
resp2.getStatus());
+
+    ErrorResponse errorResp = resp2.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE, 
errorResp.getCode());
+    Assertions.assertEquals(NoSuchMetalakeException.class.getSimpleName(), 
errorResp.getType());
+
+    // Test throw MetalakeNotInUseException
+    doThrow(new MetalakeNotInUseException("mock error"))
+        .when(jobOperationDispatcher)
+        .deleteJobTemplate(any(), any());
+
+    Response resp3 =
+        target(jobTemplatePath())
+            .path(templateName)
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .delete();
+
+    Assertions.assertEquals(Response.Status.CONFLICT.getStatusCode(), 
resp3.getStatus());
+
+    ErrorResponse errorResp2 = resp3.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_IN_USE_CODE, 
errorResp2.getCode());
+    Assertions.assertEquals(MetalakeNotInUseException.class.getSimpleName(), 
errorResp2.getType());
+
+    // Test throw RuntimeException
+    doThrow(new RuntimeException("mock error"))
+        .when(jobOperationDispatcher)
+        .deleteJobTemplate(any(), any());
+
+    Response resp4 =
+        target(jobTemplatePath())
+            .path(templateName)
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .delete();
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp4.getStatus());
+    ErrorResponse errorResp3 = resp4.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResp3.getCode());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp3.getType());
+
+    // Test throw InUseException
+    doThrow(new InUseException("mock error"))
+        .when(jobOperationDispatcher)
+        .deleteJobTemplate(any(), any());
+
+    Response resp5 =
+        target(jobTemplatePath())
+            .path(templateName)
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .delete();
+
+    Assertions.assertEquals(Response.Status.CONFLICT.getStatusCode(), 
resp5.getStatus());
+
+    ErrorResponse errorResp4 = resp5.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.IN_USE_CODE, errorResp4.getCode());
+    Assertions.assertEquals(InUseException.class.getSimpleName(), 
errorResp4.getType());
+  }
+
+  private String jobTemplatePath() {
+    return "/metalakes/" + metalake + "/jobs/templates";
+  }
+
+  private JobTemplateEntity newShellJobTemplateEntity(String name, String 
comment) {
+    ShellJobTemplate shellJobTemplate =
+        ShellJobTemplate.builder()
+            .withName(name)
+            .withComment(comment)
+            .withExecutable("/bin/echo")
+            .build();
+
+    Random rand = new Random();
+    return JobTemplateEntity.builder()
+        .withId(rand.nextLong())
+        .withName(name)
+        .withNamespace(NamespaceUtil.ofJobTemplate(metalake))
+        
.withTemplateContent(JobTemplateEntity.TemplateContent.fromJobTemplate(shellJobTemplate))
+        .withAuditInfo(auditInfo)
+        .build();
+  }
+
+  private JobTemplateEntity newSparkJobTemplateEntity(String name, String 
comment) {
+    SparkJobTemplate sparkJobTemplate =
+        SparkJobTemplate.builder()
+            .withName(name)
+            .withComment(comment)
+            .withClassName("org.apache.spark.examples.SparkPi")
+            .withExecutable("file:/path/to/spark-examples.jar")
+            .build();
+
+    Random rand = new Random();
+    return JobTemplateEntity.builder()
+        .withId(rand.nextLong())
+        .withName(name)
+        .withNamespace(NamespaceUtil.ofJobTemplate(metalake))
+        
.withTemplateContent(JobTemplateEntity.TemplateContent.fromJobTemplate(sparkJobTemplate))
+        .withAuditInfo(auditInfo)
+        .build();
+  }
+}


Reply via email to