This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new c2363107e3 [#7758] feat(server): Add the REST interface for job system 
- part2 (#7939)
c2363107e3 is described below

commit c2363107e360ad45642b1d1fe840e688db4a15b8
Author: Jerry Shao <[email protected]>
AuthorDate: Wed Aug 6 14:09:54 2025 +0800

    [#7758] feat(server): Add the REST interface for job system - part2 (#7939)
    
    ### What changes were proposed in this pull request?
    
    Add the job related rest interface for Gravitino.
    
    ### Why are the changes needed?
    
    Fix: #7758
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added UTs.
---
 .../java/org/apache/gravitino/dto/job/JobDTO.java  | 126 ++++++++++++
 .../gravitino/dto/requests/JobRunRequest.java      |  62 ++++++
 .../gravitino/dto/responses/JobListResponse.java   |  58 ++++++
 .../gravitino/dto/responses/JobResponse.java       |  57 ++++++
 .../server/web/rest/ExceptionHandlers.java         |  40 ++++
 .../gravitino/server/web/rest/JobOperations.java   | 129 +++++++++++-
 .../gravitino/server/web/rest/OperationType.java   |   4 +-
 .../server/web/rest/TestJobOperations.java         | 220 +++++++++++++++++++++
 8 files changed, 693 insertions(+), 3 deletions(-)

diff --git a/common/src/main/java/org/apache/gravitino/dto/job/JobDTO.java 
b/common/src/main/java/org/apache/gravitino/dto/job/JobDTO.java
new file mode 100644
index 0000000000..9d57b2c259
--- /dev/null
+++ b/common/src/main/java/org/apache/gravitino/dto/job/JobDTO.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.job;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.dto.AuditDTO;
+import org.apache.gravitino.job.JobHandle;
+
+/** Represents a Job Data Transfer Object (DTO). */
+@Getter
+@Accessors(fluent = true)
+@EqualsAndHashCode
+@ToString
+public class JobDTO {
+
+  @JsonProperty("jobId")
+  private final String jobId;
+
+  @JsonProperty("jobTemplateName")
+  private final String jobTemplateName;
+
+  @JsonProperty("status")
+  @JsonSerialize(using = JobDTO.StatusSerializer.class)
+  @JsonDeserialize(using = JobDTO.StatusDeserializer.class)
+  private final JobHandle.Status status;
+
+  @JsonProperty("audit")
+  private final AuditDTO audit;
+
+  /** Default constructor for Jackson deserialization. */
+  private JobDTO() {
+    this(null, null, null, null);
+  }
+
+  /**
+   * Creates a new JobPO with the specified properties.
+   *
+   * @param jobId The unique identifier for the job.
+   * @param jobTemplateName The name of the job template used for this job.
+   * @param status The current status of the job.
+   * @param audit The audit information associated with the job.
+   */
+  public JobDTO(String jobId, String jobTemplateName, JobHandle.Status status, 
AuditDTO audit) {
+    this.jobId = jobId;
+    this.jobTemplateName = jobTemplateName;
+    this.status = status;
+    this.audit = audit;
+  }
+
+  /**
+   * Validates the JobDTO.
+   *
+   * @throws IllegalArgumentException if any of the required fields are 
invalid.
+   */
+  public void validate() {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(jobId), "\"jobId\" is required and cannot be 
empty");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(jobTemplateName),
+        "\"jobTemplateName\" is required and cannot be empty");
+    Preconditions.checkArgument(status != null, "\"status\" must not be null");
+    Preconditions.checkArgument(audit != null, "\"audit\" must not be null");
+  }
+
+  /** Deserializer for Status */
+  public static class StatusDeserializer extends 
JsonDeserializer<JobHandle.Status> {
+
+    @Override
+    public JobHandle.Status deserialize(JsonParser p, DeserializationContext 
ctxt)
+        throws IOException {
+      String value = p.getText();
+      if (value != null) {
+        try {
+          return JobHandle.Status.valueOf(value.toUpperCase());
+        } catch (IllegalArgumentException e) {
+          throw ctxt.weirdStringException(value, JobHandle.Status.class, 
"Invalid status value");
+        }
+      }
+
+      throw ctxt.weirdStringException(
+          null, JobHandle.Status.class, "status cannot be null or empty");
+    }
+  }
+
+  /** Serializer for Status. */
+  public static class StatusSerializer extends 
JsonSerializer<JobHandle.Status> {
+
+    @Override
+    public void serialize(JobHandle.Status value, JsonGenerator gen, 
SerializerProvider serializers)
+        throws IOException {
+      Preconditions.checkArgument(value != null, "\"status\" cannot be null");
+      gen.writeString(value.name().toLowerCase());
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/requests/JobRunRequest.java 
b/common/src/main/java/org/apache/gravitino/dto/requests/JobRunRequest.java
new file mode 100644
index 0000000000..c51a8335a1
--- /dev/null
+++ b/common/src/main/java/org/apache/gravitino/dto/requests/JobRunRequest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.requests;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.rest.RESTRequest;
+
+/** Represents a request to run a job using a specified job template. */
+@Getter
+@EqualsAndHashCode
+public class JobRunRequest implements RESTRequest {
+
+  @JsonProperty("jobTemplateName")
+  private final String jobTemplateName;
+
+  @JsonProperty("jobConf")
+  private final Map<String, String> jobConf;
+
+  /**
+   * Creates a new JobRunRequest with the specified job template name and job 
configuration.
+   *
+   * @param jobTemplateName The name of the job template to use for running 
the job.
+   * @param jobConf A map containing the job configuration parameters.
+   */
+  public JobRunRequest(String jobTemplateName, Map<String, String> jobConf) {
+    this.jobTemplateName = jobTemplateName;
+    this.jobConf = jobConf;
+  }
+
+  /** Default constructor for Jackson deserialization. */
+  private JobRunRequest() {
+    this(null, null);
+  }
+
+  @Override
+  public void validate() throws IllegalArgumentException {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(jobTemplateName),
+        "\"jobTemplateName\" is required and cannot be empty");
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/responses/JobListResponse.java 
b/common/src/main/java/org/apache/gravitino/dto/responses/JobListResponse.java
new file mode 100644
index 0000000000..50c4c422b5
--- /dev/null
+++ 
b/common/src/main/java/org/apache/gravitino/dto/responses/JobListResponse.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.responses;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.gravitino.dto.job.JobDTO;
+
+/** Represents a response containing a list of jobs. */
+@Getter
+@EqualsAndHashCode(callSuper = true)
+public class JobListResponse extends BaseResponse {
+
+  @JsonProperty("jobs")
+  private final List<JobDTO> jobs;
+
+  /**
+   * Creates a new JobListResponse with the specified list of jobs.
+   *
+   * @param jobs The list of jobs to include in the response.
+   */
+  public JobListResponse(List<JobDTO> jobs) {
+    super(0);
+    this.jobs = jobs;
+  }
+
+  /** Default constructor for Jackson deserialization. */
+  private JobListResponse() {
+    this(null);
+  }
+
+  @Override
+  public void validate() throws IllegalArgumentException {
+    super.validate();
+
+    Preconditions.checkArgument(jobs != null, "\"jobs\" must not be null");
+    jobs.forEach(JobDTO::validate);
+  }
+}
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/responses/JobResponse.java 
b/common/src/main/java/org/apache/gravitino/dto/responses/JobResponse.java
new file mode 100644
index 0000000000..04b153c727
--- /dev/null
+++ b/common/src/main/java/org/apache/gravitino/dto/responses/JobResponse.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.responses;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.gravitino.dto.job.JobDTO;
+
+/** Represents a response containing a single job. */
+@Getter
+@EqualsAndHashCode(callSuper = true)
+public class JobResponse extends BaseResponse {
+
+  @JsonProperty("job")
+  private final JobDTO job;
+
+  /**
+   * Creates a new JobResponse with the specified job.
+   *
+   * @param job The job to include in the response.
+   */
+  public JobResponse(JobDTO job) {
+    super(0);
+    this.job = job;
+  }
+
+  /** Default constructor for Jackson deserialization. */
+  private JobResponse() {
+    this(null);
+  }
+
+  @Override
+  public void validate() throws IllegalArgumentException {
+    super.validate();
+
+    Preconditions.checkArgument(job != null, "\"job\" must not be null");
+    job.validate();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
index 04025fb902..4e6190a440 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
@@ -139,6 +139,11 @@ public class ExceptionHandlers {
     return JobTemplateExceptionHandler.INSTANCE.handle(op, jobTemplate, 
metalake, e);
   }
 
+  public static Response handleJobException(
+      OperationType op, String job, String metalake, Exception e) {
+    return JobExceptionHandler.INSTANCE.handle(op, job, metalake, e);
+  }
+
   public static Response handleTestConnectionException(Exception e) {
     ErrorResponse response;
     if (e instanceof IllegalArgumentException) {
@@ -821,6 +826,41 @@ public class ExceptionHandlers {
     }
   }
 
+  private static class JobExceptionHandler extends BaseExceptionHandler {
+
+    private static final ExceptionHandler INSTANCE = new JobExceptionHandler();
+
+    private static String getJobErrorMsg(
+        String jobTemplate, String operation, String parent, String reason) {
+      return String.format(
+          "Failed to operate job(s)%s operation [%s] under object [%s], reason 
[%s]",
+          jobTemplate, operation, parent, reason);
+    }
+
+    @Override
+    public Response handle(OperationType op, String jobTemplate, String 
parent, Exception e) {
+      String formatted = StringUtil.isBlank(jobTemplate) ? "" : " [" + 
jobTemplate + "]";
+      String errorMsg = getJobErrorMsg(formatted, op.name(), parent, 
getErrorMsg(e));
+      LOG.warn(errorMsg, e);
+
+      if (e instanceof IllegalArgumentException) {
+        return Utils.illegalArguments(errorMsg, e);
+
+      } else if (e instanceof NotFoundException) {
+        return Utils.notFound(errorMsg, e);
+
+      } else if (e instanceof NotInUseException) {
+        return Utils.notInUse(errorMsg, e);
+
+      } else if (e instanceof ForbiddenException) {
+        return Utils.forbidden(errorMsg, e);
+
+      } else {
+        return super.handle(op, jobTemplate, parent, e);
+      }
+    }
+  }
+
   @VisibleForTesting
   static class BaseExceptionHandler extends ExceptionHandler {
 
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java 
b/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
index daf120c643..d1daed2a77 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
@@ -22,7 +22,10 @@ import com.codahale.metrics.annotation.ResponseMetered;
 import com.codahale.metrics.annotation.Timed;
 import com.google.common.annotations.VisibleForTesting;
 import java.time.Instant;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import javax.inject.Inject;
 import javax.servlet.http.HttpServletRequest;
@@ -37,18 +40,23 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.dto.job.JobDTO;
 import org.apache.gravitino.dto.job.JobTemplateDTO;
 import org.apache.gravitino.dto.job.ShellJobTemplateDTO;
 import org.apache.gravitino.dto.job.SparkJobTemplateDTO;
+import org.apache.gravitino.dto.requests.JobRunRequest;
 import org.apache.gravitino.dto.requests.JobTemplateRegisterRequest;
 import org.apache.gravitino.dto.responses.BaseResponse;
 import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.JobListResponse;
+import org.apache.gravitino.dto.responses.JobResponse;
 import org.apache.gravitino.dto.responses.JobTemplateListResponse;
 import org.apache.gravitino.dto.responses.JobTemplateResponse;
 import org.apache.gravitino.dto.responses.NameListResponse;
 import org.apache.gravitino.dto.util.DTOConverters;
 import org.apache.gravitino.job.JobOperationDispatcher;
 import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobEntity;
 import org.apache.gravitino.meta.JobTemplateEntity;
 import org.apache.gravitino.metrics.MetricNames;
 import org.apache.gravitino.server.web.Utils;
@@ -87,7 +95,7 @@ public class JobOperations {
           httpRequest,
           () -> {
             List<JobTemplateDTO> jobTemplates =
-                toDTOs(jobOperationDispatcher.listJobTemplates(metalake));
+                
toJobTemplateDTOs(jobOperationDispatcher.listJobTemplates(metalake));
             if (details) {
               LOG.info("List {} job templates in metalake: {}", 
jobTemplates.size(), metalake);
               return Utils.ok(new JobTemplateListResponse(jobTemplates));
@@ -191,7 +199,111 @@ public class JobOperations {
     }
   }
 
-  private static List<JobTemplateDTO> toDTOs(List<JobTemplateEntity> 
jobTemplateEntities) {
+  @GET
+  @Path("runs")
+  @Produces("application/vnd.gravitino.v1+json")
+  @Timed(name = "list-jobs." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
+  @ResponseMetered(name = "list-jobs", absolute = true)
+  public Response listJobs(
+      @PathParam("metalake") String metalake,
+      @QueryParam("jobTemplateName") String jobTemplateName) {
+    LOG.info(
+        "Received request to list jobs in metalake {}{}",
+        metalake,
+        jobTemplateName != null ? " for job template " + jobTemplateName : "");
+
+    try {
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            List<JobEntity> jobEntities =
+                jobOperationDispatcher.listJobs(metalake, 
Optional.ofNullable(jobTemplateName));
+            List<JobDTO> jobDTOs = toJobDTOs(jobEntities);
+
+            LOG.info("Listed {} jobs in metalake {}", jobEntities.size(), 
metalake);
+            return Utils.ok(new JobListResponse(jobDTOs));
+          });
+
+    } catch (Exception e) {
+      return ExceptionHandlers.handleJobException(OperationType.LIST, "", 
metalake, e);
+    }
+  }
+
+  @GET
+  @Path("runs/{jobId}")
+  @Produces("application/vnd.gravitino.v1+json")
+  @Timed(name = "get-job." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
+  @ResponseMetered(name = "get-job", absolute = true)
+  public Response getJob(@PathParam("metalake") String metalake, 
@PathParam("jobId") String jobId) {
+    LOG.info("Received request to get job {} in metalake {}", jobId, metalake);
+
+    try {
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            JobEntity jobEntity = jobOperationDispatcher.getJob(metalake, 
jobId);
+            LOG.info("Retrieved job {} in metalake: {}", jobId, metalake);
+            return Utils.ok(new JobResponse(toDTO(jobEntity)));
+          });
+
+    } catch (Exception e) {
+      return ExceptionHandlers.handleJobException(OperationType.GET, jobId, 
metalake, e);
+    }
+  }
+
+  @POST
+  @Path("runs")
+  @Produces("application/vnd.gravitino.v1+json")
+  @Timed(name = "run-job." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
+  @ResponseMetered(name = "run-job", absolute = true)
+  public Response runJob(@PathParam("metalake") String metalake, JobRunRequest 
request) {
+    LOG.info("Received request to run job in metalake: {}", metalake);
+
+    try {
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            request.validate();
+            Map<String, String> jobConf =
+                request.getJobConf() != null ? request.getJobConf() : 
Collections.emptyMap();
+
+            JobEntity jobEntity =
+                jobOperationDispatcher.runJob(metalake, 
request.getJobTemplateName(), jobConf);
+
+            LOG.info("Run job {} in metalake: {}", jobEntity.name(), metalake);
+            return Utils.ok(new JobResponse(toDTO(jobEntity)));
+          });
+
+    } catch (Exception e) {
+      return ExceptionHandlers.handleJobException(OperationType.RUN, "", 
metalake, e);
+    }
+  }
+
+  @POST
+  @Path("runs/{jobId}")
+  @Produces("application/vnd.gravitino.v1+json")
+  @Timed(name = "cancel-job." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
+  public Response cancelJob(
+      @PathParam("metalake") String metalake, @PathParam("jobId") String 
jobId) {
+    LOG.info("Received request to cancel job {} in metalake {}", jobId, 
metalake);
+
+    try {
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            JobEntity jobEntity = jobOperationDispatcher.cancelJob(metalake, 
jobId);
+
+            LOG.info("Cancelled job {} in metalake: {}", jobId, metalake);
+            return Utils.ok(new JobResponse(toDTO(jobEntity)));
+          });
+
+    } catch (Exception e) {
+      return ExceptionHandlers.handleJobException(OperationType.CANCEL, jobId, 
metalake, e);
+    }
+  }
+
+  private static List<JobTemplateDTO> toJobTemplateDTOs(
+      List<JobTemplateEntity> jobTemplateEntities) {
     return 
jobTemplateEntities.stream().map(JobOperations::toDTO).collect(Collectors.toList());
   }
 
@@ -250,4 +362,17 @@ public class JobOperations {
                 .build())
         .build();
   }
+
+  @VisibleForTesting
+  static JobDTO toDTO(JobEntity jobEntity) {
+    return new JobDTO(
+        jobEntity.name(),
+        jobEntity.jobTemplateName(),
+        jobEntity.status(),
+        DTOConverters.toDTO(jobEntity.auditInfo()));
+  }
+
+  private static List<JobDTO> toJobDTOs(List<JobEntity> jobEntities) {
+    return 
jobEntities.stream().map(JobOperations::toDTO).collect(Collectors.toList());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/OperationType.java 
b/server/src/main/java/org/apache/gravitino/server/web/rest/OperationType.java
index 2b8abd91f1..7b9025eacb 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/OperationType.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/OperationType.java
@@ -37,5 +37,7 @@ public enum OperationType {
   SET,
   REGISTER, // An operation to register a model
   LIST_VERSIONS, // An operation to list versions of a model
-  LINK // An operation to link a version to a model
+  LINK, // An operation to link a version to a model
+  RUN, // An operation to run a job
+  CANCEL // An operation to cancel a job
 }
diff --git 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
index a374edf21d..56f4be73a3 100644
--- 
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
+++ 
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
@@ -25,9 +25,12 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.client.Entity;
@@ -36,23 +39,29 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.dto.job.JobTemplateDTO;
+import org.apache.gravitino.dto.requests.JobRunRequest;
 import org.apache.gravitino.dto.requests.JobTemplateRegisterRequest;
 import org.apache.gravitino.dto.responses.BaseResponse;
 import org.apache.gravitino.dto.responses.DropResponse;
 import org.apache.gravitino.dto.responses.ErrorConstants;
 import org.apache.gravitino.dto.responses.ErrorResponse;
+import org.apache.gravitino.dto.responses.JobListResponse;
+import org.apache.gravitino.dto.responses.JobResponse;
 import org.apache.gravitino.dto.responses.JobTemplateListResponse;
 import org.apache.gravitino.dto.responses.JobTemplateResponse;
 import org.apache.gravitino.dto.responses.NameListResponse;
 import org.apache.gravitino.exceptions.InUseException;
 import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
 import org.apache.gravitino.exceptions.MetalakeNotInUseException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
 import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.job.JobHandle;
 import org.apache.gravitino.job.JobOperationDispatcher;
 import org.apache.gravitino.job.ShellJobTemplate;
 import org.apache.gravitino.job.SparkJobTemplate;
 import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobEntity;
 import org.apache.gravitino.meta.JobTemplateEntity;
 import org.apache.gravitino.rest.RESTUtils;
 import org.apache.gravitino.server.web.ObjectMapperProvider;
@@ -489,10 +498,208 @@ public class TestJobOperations extends JerseyTest {
     Assertions.assertEquals(InUseException.class.getSimpleName(), 
errorResp4.getType());
   }
 
+  @Test
+  public void testListJobs() {
+    String templateName = "shell_template_1";
+    JobEntity job1 = newJobEntity(templateName, JobHandle.Status.QUEUED);
+    JobEntity job2 = newJobEntity(templateName, JobHandle.Status.STARTED);
+    JobEntity job3 = newJobEntity("spark_template_1", 
JobHandle.Status.SUCCEEDED);
+
+    when(jobOperationDispatcher.listJobs(metalake, Optional.empty()))
+        .thenReturn(Lists.newArrayList(job1, job2, job3));
+
+    Response resp =
+        target(jobRunPath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(APPLICATION_JSON_TYPE, resp.getMediaType());
+
+    JobListResponse jobListResponse = resp.readEntity(JobListResponse.class);
+    Assertions.assertEquals(0, jobListResponse.getCode());
+
+    Assertions.assertEquals(3, jobListResponse.getJobs().size());
+    Assertions.assertEquals(JobOperations.toDTO(job1), 
jobListResponse.getJobs().get(0));
+    Assertions.assertEquals(JobOperations.toDTO(job2), 
jobListResponse.getJobs().get(1));
+    Assertions.assertEquals(JobOperations.toDTO(job3), 
jobListResponse.getJobs().get(2));
+
+    // Test list jobs by template name
+    when(jobOperationDispatcher.listJobs(metalake, Optional.of(templateName)))
+        .thenReturn(Lists.newArrayList(job1, job2));
+
+    Response resp1 =
+        target(jobRunPath())
+            .queryParam("jobTemplateName", templateName)
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp1.getStatus());
+    Assertions.assertEquals(APPLICATION_JSON_TYPE, resp1.getMediaType());
+
+    JobListResponse jobListResponse1 = resp1.readEntity(JobListResponse.class);
+    Assertions.assertEquals(0, jobListResponse1.getCode());
+    Assertions.assertEquals(2, jobListResponse1.getJobs().size());
+    Assertions.assertEquals(JobOperations.toDTO(job1), 
jobListResponse1.getJobs().get(0));
+    Assertions.assertEquals(JobOperations.toDTO(job2), 
jobListResponse1.getJobs().get(1));
+
+    // Test throw NoSuchMetalakeException
+    doThrow(new NoSuchMetalakeException("mock error"))
+        .when(jobOperationDispatcher)
+        .listJobs(any(), any());
+
+    Response resp2 =
+        target(jobRunPath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
resp2.getStatus());
+
+    ErrorResponse errorResp = resp2.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE, 
errorResp.getCode());
+    Assertions.assertEquals(NoSuchMetalakeException.class.getSimpleName(), 
errorResp.getType());
+
+    // Test throw MetalakeNotInUseException
+    doThrow(new MetalakeNotInUseException("mock error"))
+        .when(jobOperationDispatcher)
+        .listJobs(any(), any());
+
+    Response resp3 =
+        target(jobRunPath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.CONFLICT.getStatusCode(), 
resp3.getStatus());
+
+    ErrorResponse errorResp2 = resp3.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_IN_USE_CODE, 
errorResp2.getCode());
+    Assertions.assertEquals(MetalakeNotInUseException.class.getSimpleName(), 
errorResp2.getType());
+
+    // Test NoSuchJobTemplateException
+    doThrow(new NoSuchJobTemplateException("mock error"))
+        .when(jobOperationDispatcher)
+        .listJobs(any(), any());
+
+    Response resp4 =
+        target(jobRunPath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
resp4.getStatus());
+
+    ErrorResponse errorResp3 = resp4.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE, 
errorResp3.getCode());
+    Assertions.assertEquals(NoSuchJobTemplateException.class.getSimpleName(), 
errorResp3.getType());
+
+    // Test throw RuntimeException
+    doThrow(new RuntimeException("mock 
error")).when(jobOperationDispatcher).listJobs(any(), any());
+
+    Response resp5 =
+        target(jobRunPath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .get();
+
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp5.getStatus());
+
+    ErrorResponse errorResp4 = resp5.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE, 
errorResp4.getCode());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp4.getType());
+  }
+
+  @Test
+  public void testRunJob() {
+    String templateName = "shell_template_1";
+    Map<String, String> jobConf = ImmutableMap.of("key1", "value1", "key2", 
"value2");
+    JobEntity job = newJobEntity(templateName, JobHandle.Status.QUEUED);
+    JobRunRequest req = new JobRunRequest(templateName, jobConf);
+
+    when(jobOperationDispatcher.runJob(metalake, templateName, 
jobConf)).thenReturn(job);
+
+    Response resp =
+        target(jobRunPath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .post(Entity.entity(req, APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(APPLICATION_JSON_TYPE, resp.getMediaType());
+
+    JobResponse jobResp = resp.readEntity(JobResponse.class);
+    Assertions.assertEquals(0, jobResp.getCode());
+    Assertions.assertEquals(JobOperations.toDTO(job), jobResp.getJob());
+
+    // Test throw NoSuchJobTemplateException
+    doThrow(new NoSuchJobTemplateException("mock error"))
+        .when(jobOperationDispatcher)
+        .runJob(any(), any(), any());
+
+    Response resp2 =
+        target(jobRunPath())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .post(Entity.entity(req, APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
resp2.getStatus());
+
+    ErrorResponse errorResp = resp2.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE, 
errorResp.getCode());
+    Assertions.assertEquals(NoSuchJobTemplateException.class.getSimpleName(), 
errorResp.getType());
+  }
+
+  @Test
+  public void testCancelJob() {
+    JobEntity job = newJobEntity("shell_template_1", JobHandle.Status.STARTED);
+
+    when(jobOperationDispatcher.cancelJob(metalake, 
job.name())).thenReturn(job);
+
+    Response resp =
+        target(jobRunPath())
+            .path(job.name())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .post(null);
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(APPLICATION_JSON_TYPE, resp.getMediaType());
+
+    JobResponse jobResp = resp.readEntity(JobResponse.class);
+    Assertions.assertEquals(0, jobResp.getCode());
+    Assertions.assertEquals(JobOperations.toDTO(job), jobResp.getJob());
+
+    // Test throw NoSuchJobException
+    doThrow(new NoSuchJobException("mock error"))
+        .when(jobOperationDispatcher)
+        .cancelJob(any(), any());
+
+    Response resp2 =
+        target(jobRunPath())
+            .path(job.name())
+            .request(APPLICATION_JSON_TYPE)
+            .accept("application/vnd.gravitino.v1+json")
+            .post(null);
+
+    Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
resp2.getStatus());
+
+    ErrorResponse errorResp = resp2.readEntity(ErrorResponse.class);
+    Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE, 
errorResp.getCode());
+    Assertions.assertEquals(NoSuchJobException.class.getSimpleName(), 
errorResp.getType());
+  }
+
   private String jobTemplatePath() {
     return "/metalakes/" + metalake + "/jobs/templates";
   }
 
+  private String jobRunPath() {
+    return "/metalakes/" + metalake + "/jobs/runs";
+  }
+
   private JobTemplateEntity newShellJobTemplateEntity(String name, String 
comment) {
     ShellJobTemplate shellJobTemplate =
         ShellJobTemplate.builder()
@@ -529,4 +736,17 @@ public class TestJobOperations extends JerseyTest {
         .withAuditInfo(auditInfo)
         .build();
   }
+
+  private JobEntity newJobEntity(String templateName, JobHandle.Status status) 
{
+    Random rand = new Random();
+    return JobEntity.builder()
+        .withId(rand.nextLong())
+        .withJobExecutionId(rand.nextLong() + "")
+        .withNamespace(NamespaceUtil.ofJob(metalake))
+        .withJobTemplateName(templateName)
+        .withStatus(status)
+        .withAuditInfo(
+            
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+        .build();
+  }
 }


Reply via email to