This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new c2363107e3 [#7758] feat(server): Add the REST interface for job system
- part2 (#7939)
c2363107e3 is described below
commit c2363107e360ad45642b1d1fe840e688db4a15b8
Author: Jerry Shao <[email protected]>
AuthorDate: Wed Aug 6 14:09:54 2025 +0800
[#7758] feat(server): Add the REST interface for job system - part2 (#7939)
### What changes were proposed in this pull request?
Add the job related rest interface for Gravitino.
### Why are the changes needed?
Fix: #7758
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added UTs.
---
.../java/org/apache/gravitino/dto/job/JobDTO.java | 126 ++++++++++++
.../gravitino/dto/requests/JobRunRequest.java | 62 ++++++
.../gravitino/dto/responses/JobListResponse.java | 58 ++++++
.../gravitino/dto/responses/JobResponse.java | 57 ++++++
.../server/web/rest/ExceptionHandlers.java | 40 ++++
.../gravitino/server/web/rest/JobOperations.java | 129 +++++++++++-
.../gravitino/server/web/rest/OperationType.java | 4 +-
.../server/web/rest/TestJobOperations.java | 220 +++++++++++++++++++++
8 files changed, 693 insertions(+), 3 deletions(-)
diff --git a/common/src/main/java/org/apache/gravitino/dto/job/JobDTO.java
b/common/src/main/java/org/apache/gravitino/dto/job/JobDTO.java
new file mode 100644
index 0000000000..9d57b2c259
--- /dev/null
+++ b/common/src/main/java/org/apache/gravitino/dto/job/JobDTO.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.job;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.dto.AuditDTO;
+import org.apache.gravitino.job.JobHandle;
+
+/** Represents a Job Data Transfer Object (DTO). */
+@Getter
+@Accessors(fluent = true)
+@EqualsAndHashCode
+@ToString
+public class JobDTO {
+
+ @JsonProperty("jobId")
+ private final String jobId;
+
+ @JsonProperty("jobTemplateName")
+ private final String jobTemplateName;
+
+ @JsonProperty("status")
+ @JsonSerialize(using = JobDTO.StatusSerializer.class)
+ @JsonDeserialize(using = JobDTO.StatusDeserializer.class)
+ private final JobHandle.Status status;
+
+ @JsonProperty("audit")
+ private final AuditDTO audit;
+
+ /** Default constructor for Jackson deserialization. */
+ private JobDTO() {
+ this(null, null, null, null);
+ }
+
+ /**
+ * Creates a new JobPO with the specified properties.
+ *
+ * @param jobId The unique identifier for the job.
+ * @param jobTemplateName The name of the job template used for this job.
+ * @param status The current status of the job.
+ * @param audit The audit information associated with the job.
+ */
+ public JobDTO(String jobId, String jobTemplateName, JobHandle.Status status,
AuditDTO audit) {
+ this.jobId = jobId;
+ this.jobTemplateName = jobTemplateName;
+ this.status = status;
+ this.audit = audit;
+ }
+
+ /**
+ * Validates the JobDTO.
+ *
+ * @throws IllegalArgumentException if any of the required fields are
invalid.
+ */
+ public void validate() {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(jobId), "\"jobId\" is required and cannot be
empty");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(jobTemplateName),
+ "\"jobTemplateName\" is required and cannot be empty");
+ Preconditions.checkArgument(status != null, "\"status\" must not be null");
+ Preconditions.checkArgument(audit != null, "\"audit\" must not be null");
+ }
+
+ /** Deserializer for Status */
+ public static class StatusDeserializer extends
JsonDeserializer<JobHandle.Status> {
+
+ @Override
+ public JobHandle.Status deserialize(JsonParser p, DeserializationContext
ctxt)
+ throws IOException {
+ String value = p.getText();
+ if (value != null) {
+ try {
+ return JobHandle.Status.valueOf(value.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw ctxt.weirdStringException(value, JobHandle.Status.class,
"Invalid status value");
+ }
+ }
+
+ throw ctxt.weirdStringException(
+ null, JobHandle.Status.class, "status cannot be null or empty");
+ }
+ }
+
+ /** Serializer for Status. */
+ public static class StatusSerializer extends
JsonSerializer<JobHandle.Status> {
+
+ @Override
+ public void serialize(JobHandle.Status value, JsonGenerator gen,
SerializerProvider serializers)
+ throws IOException {
+ Preconditions.checkArgument(value != null, "\"status\" cannot be null");
+ gen.writeString(value.name().toLowerCase());
+ }
+ }
+}
diff --git
a/common/src/main/java/org/apache/gravitino/dto/requests/JobRunRequest.java
b/common/src/main/java/org/apache/gravitino/dto/requests/JobRunRequest.java
new file mode 100644
index 0000000000..c51a8335a1
--- /dev/null
+++ b/common/src/main/java/org/apache/gravitino/dto/requests/JobRunRequest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.requests;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.rest.RESTRequest;
+
+/** Represents a request to run a job using a specified job template. */
+@Getter
+@EqualsAndHashCode
+public class JobRunRequest implements RESTRequest {
+
+ @JsonProperty("jobTemplateName")
+ private final String jobTemplateName;
+
+ @JsonProperty("jobConf")
+ private final Map<String, String> jobConf;
+
+ /**
+ * Creates a new JobRunRequest with the specified job template name and job
configuration.
+ *
+ * @param jobTemplateName The name of the job template to use for running
the job.
+ * @param jobConf A map containing the job configuration parameters.
+ */
+ public JobRunRequest(String jobTemplateName, Map<String, String> jobConf) {
+ this.jobTemplateName = jobTemplateName;
+ this.jobConf = jobConf;
+ }
+
+ /** Default constructor for Jackson deserialization. */
+ private JobRunRequest() {
+ this(null, null);
+ }
+
+ @Override
+ public void validate() throws IllegalArgumentException {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(jobTemplateName),
+ "\"jobTemplateName\" is required and cannot be empty");
+ }
+}
diff --git
a/common/src/main/java/org/apache/gravitino/dto/responses/JobListResponse.java
b/common/src/main/java/org/apache/gravitino/dto/responses/JobListResponse.java
new file mode 100644
index 0000000000..50c4c422b5
--- /dev/null
+++
b/common/src/main/java/org/apache/gravitino/dto/responses/JobListResponse.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.responses;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.gravitino.dto.job.JobDTO;
+
+/** Represents a response containing a list of jobs. */
+@Getter
+@EqualsAndHashCode(callSuper = true)
+public class JobListResponse extends BaseResponse {
+
+ @JsonProperty("jobs")
+ private final List<JobDTO> jobs;
+
+ /**
+ * Creates a new JobListResponse with the specified list of jobs.
+ *
+ * @param jobs The list of jobs to include in the response.
+ */
+ public JobListResponse(List<JobDTO> jobs) {
+ super(0);
+ this.jobs = jobs;
+ }
+
+ /** Default constructor for Jackson deserialization. */
+ private JobListResponse() {
+ this(null);
+ }
+
+ @Override
+ public void validate() throws IllegalArgumentException {
+ super.validate();
+
+ Preconditions.checkArgument(jobs != null, "\"jobs\" must not be null");
+ jobs.forEach(JobDTO::validate);
+ }
+}
diff --git
a/common/src/main/java/org/apache/gravitino/dto/responses/JobResponse.java
b/common/src/main/java/org/apache/gravitino/dto/responses/JobResponse.java
new file mode 100644
index 0000000000..04b153c727
--- /dev/null
+++ b/common/src/main/java/org/apache/gravitino/dto/responses/JobResponse.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.dto.responses;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.gravitino.dto.job.JobDTO;
+
+/** Represents a response containing a single job. */
+@Getter
+@EqualsAndHashCode(callSuper = true)
+public class JobResponse extends BaseResponse {
+
+ @JsonProperty("job")
+ private final JobDTO job;
+
+ /**
+ * Creates a new JobResponse with the specified job.
+ *
+ * @param job The job to include in the response.
+ */
+ public JobResponse(JobDTO job) {
+ super(0);
+ this.job = job;
+ }
+
+ /** Default constructor for Jackson deserialization. */
+ private JobResponse() {
+ this(null);
+ }
+
+ @Override
+ public void validate() throws IllegalArgumentException {
+ super.validate();
+
+ Preconditions.checkArgument(job != null, "\"job\" must not be null");
+ job.validate();
+ }
+}
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
index 04025fb902..4e6190a440 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
@@ -139,6 +139,11 @@ public class ExceptionHandlers {
return JobTemplateExceptionHandler.INSTANCE.handle(op, jobTemplate,
metalake, e);
}
+ public static Response handleJobException(
+ OperationType op, String job, String metalake, Exception e) {
+ return JobExceptionHandler.INSTANCE.handle(op, job, metalake, e);
+ }
+
public static Response handleTestConnectionException(Exception e) {
ErrorResponse response;
if (e instanceof IllegalArgumentException) {
@@ -821,6 +826,41 @@ public class ExceptionHandlers {
}
}
+ private static class JobExceptionHandler extends BaseExceptionHandler {
+
+ private static final ExceptionHandler INSTANCE = new JobExceptionHandler();
+
+ private static String getJobErrorMsg(
+ String jobTemplate, String operation, String parent, String reason) {
+ return String.format(
+ "Failed to operate job(s)%s operation [%s] under object [%s], reason
[%s]",
+ jobTemplate, operation, parent, reason);
+ }
+
+ @Override
+ public Response handle(OperationType op, String jobTemplate, String
parent, Exception e) {
+ String formatted = StringUtil.isBlank(jobTemplate) ? "" : " [" +
jobTemplate + "]";
+ String errorMsg = getJobErrorMsg(formatted, op.name(), parent,
getErrorMsg(e));
+ LOG.warn(errorMsg, e);
+
+ if (e instanceof IllegalArgumentException) {
+ return Utils.illegalArguments(errorMsg, e);
+
+ } else if (e instanceof NotFoundException) {
+ return Utils.notFound(errorMsg, e);
+
+ } else if (e instanceof NotInUseException) {
+ return Utils.notInUse(errorMsg, e);
+
+ } else if (e instanceof ForbiddenException) {
+ return Utils.forbidden(errorMsg, e);
+
+ } else {
+ return super.handle(op, jobTemplate, parent, e);
+ }
+ }
+ }
+
@VisibleForTesting
static class BaseExceptionHandler extends ExceptionHandler {
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
index daf120c643..d1daed2a77 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/JobOperations.java
@@ -22,7 +22,10 @@ import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.google.common.annotations.VisibleForTesting;
import java.time.Instant;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
@@ -37,18 +40,23 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.dto.job.JobDTO;
import org.apache.gravitino.dto.job.JobTemplateDTO;
import org.apache.gravitino.dto.job.ShellJobTemplateDTO;
import org.apache.gravitino.dto.job.SparkJobTemplateDTO;
+import org.apache.gravitino.dto.requests.JobRunRequest;
import org.apache.gravitino.dto.requests.JobTemplateRegisterRequest;
import org.apache.gravitino.dto.responses.BaseResponse;
import org.apache.gravitino.dto.responses.DropResponse;
+import org.apache.gravitino.dto.responses.JobListResponse;
+import org.apache.gravitino.dto.responses.JobResponse;
import org.apache.gravitino.dto.responses.JobTemplateListResponse;
import org.apache.gravitino.dto.responses.JobTemplateResponse;
import org.apache.gravitino.dto.responses.NameListResponse;
import org.apache.gravitino.dto.util.DTOConverters;
import org.apache.gravitino.job.JobOperationDispatcher;
import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobEntity;
import org.apache.gravitino.meta.JobTemplateEntity;
import org.apache.gravitino.metrics.MetricNames;
import org.apache.gravitino.server.web.Utils;
@@ -87,7 +95,7 @@ public class JobOperations {
httpRequest,
() -> {
List<JobTemplateDTO> jobTemplates =
- toDTOs(jobOperationDispatcher.listJobTemplates(metalake));
+
toJobTemplateDTOs(jobOperationDispatcher.listJobTemplates(metalake));
if (details) {
LOG.info("List {} job templates in metalake: {}",
jobTemplates.size(), metalake);
return Utils.ok(new JobTemplateListResponse(jobTemplates));
@@ -191,7 +199,111 @@ public class JobOperations {
}
}
- private static List<JobTemplateDTO> toDTOs(List<JobTemplateEntity>
jobTemplateEntities) {
+ @GET
+ @Path("runs")
+ @Produces("application/vnd.gravitino.v1+json")
+ @Timed(name = "list-jobs." + MetricNames.HTTP_PROCESS_DURATION, absolute =
true)
+ @ResponseMetered(name = "list-jobs", absolute = true)
+ public Response listJobs(
+ @PathParam("metalake") String metalake,
+ @QueryParam("jobTemplateName") String jobTemplateName) {
+ LOG.info(
+ "Received request to list jobs in metalake {}{}",
+ metalake,
+ jobTemplateName != null ? " for job template " + jobTemplateName : "");
+
+ try {
+ return Utils.doAs(
+ httpRequest,
+ () -> {
+ List<JobEntity> jobEntities =
+ jobOperationDispatcher.listJobs(metalake,
Optional.ofNullable(jobTemplateName));
+ List<JobDTO> jobDTOs = toJobDTOs(jobEntities);
+
+ LOG.info("Listed {} jobs in metalake {}", jobEntities.size(),
metalake);
+ return Utils.ok(new JobListResponse(jobDTOs));
+ });
+
+ } catch (Exception e) {
+ return ExceptionHandlers.handleJobException(OperationType.LIST, "",
metalake, e);
+ }
+ }
+
+ @GET
+ @Path("runs/{jobId}")
+ @Produces("application/vnd.gravitino.v1+json")
+ @Timed(name = "get-job." + MetricNames.HTTP_PROCESS_DURATION, absolute =
true)
+ @ResponseMetered(name = "get-job", absolute = true)
+ public Response getJob(@PathParam("metalake") String metalake,
@PathParam("jobId") String jobId) {
+ LOG.info("Received request to get job {} in metalake {}", jobId, metalake);
+
+ try {
+ return Utils.doAs(
+ httpRequest,
+ () -> {
+ JobEntity jobEntity = jobOperationDispatcher.getJob(metalake,
jobId);
+ LOG.info("Retrieved job {} in metalake: {}", jobId, metalake);
+ return Utils.ok(new JobResponse(toDTO(jobEntity)));
+ });
+
+ } catch (Exception e) {
+ return ExceptionHandlers.handleJobException(OperationType.GET, jobId,
metalake, e);
+ }
+ }
+
+ @POST
+ @Path("runs")
+ @Produces("application/vnd.gravitino.v1+json")
+ @Timed(name = "run-job." + MetricNames.HTTP_PROCESS_DURATION, absolute =
true)
+ @ResponseMetered(name = "run-job", absolute = true)
+ public Response runJob(@PathParam("metalake") String metalake, JobRunRequest
request) {
+ LOG.info("Received request to run job in metalake: {}", metalake);
+
+ try {
+ return Utils.doAs(
+ httpRequest,
+ () -> {
+ request.validate();
+ Map<String, String> jobConf =
+ request.getJobConf() != null ? request.getJobConf() :
Collections.emptyMap();
+
+ JobEntity jobEntity =
+ jobOperationDispatcher.runJob(metalake,
request.getJobTemplateName(), jobConf);
+
+ LOG.info("Run job {} in metalake: {}", jobEntity.name(), metalake);
+ return Utils.ok(new JobResponse(toDTO(jobEntity)));
+ });
+
+ } catch (Exception e) {
+ return ExceptionHandlers.handleJobException(OperationType.RUN, "",
metalake, e);
+ }
+ }
+
+ @POST
+ @Path("runs/{jobId}")
+ @Produces("application/vnd.gravitino.v1+json")
+ @Timed(name = "cancel-job." + MetricNames.HTTP_PROCESS_DURATION, absolute =
true)
+ public Response cancelJob(
+ @PathParam("metalake") String metalake, @PathParam("jobId") String
jobId) {
+ LOG.info("Received request to cancel job {} in metalake {}", jobId,
metalake);
+
+ try {
+ return Utils.doAs(
+ httpRequest,
+ () -> {
+ JobEntity jobEntity = jobOperationDispatcher.cancelJob(metalake,
jobId);
+
+ LOG.info("Cancelled job {} in metalake: {}", jobId, metalake);
+ return Utils.ok(new JobResponse(toDTO(jobEntity)));
+ });
+
+ } catch (Exception e) {
+ return ExceptionHandlers.handleJobException(OperationType.CANCEL, jobId,
metalake, e);
+ }
+ }
+
+ private static List<JobTemplateDTO> toJobTemplateDTOs(
+ List<JobTemplateEntity> jobTemplateEntities) {
return
jobTemplateEntities.stream().map(JobOperations::toDTO).collect(Collectors.toList());
}
@@ -250,4 +362,17 @@ public class JobOperations {
.build())
.build();
}
+
+ @VisibleForTesting
+ static JobDTO toDTO(JobEntity jobEntity) {
+ return new JobDTO(
+ jobEntity.name(),
+ jobEntity.jobTemplateName(),
+ jobEntity.status(),
+ DTOConverters.toDTO(jobEntity.auditInfo()));
+ }
+
+ private static List<JobDTO> toJobDTOs(List<JobEntity> jobEntities) {
+ return
jobEntities.stream().map(JobOperations::toDTO).collect(Collectors.toList());
+ }
}
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/OperationType.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/OperationType.java
index 2b8abd91f1..7b9025eacb 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/rest/OperationType.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/OperationType.java
@@ -37,5 +37,7 @@ public enum OperationType {
SET,
REGISTER, // An operation to register a model
LIST_VERSIONS, // An operation to list versions of a model
- LINK // An operation to link a version to a model
+ LINK, // An operation to link a version to a model
+ RUN, // An operation to run a job
+ CANCEL // An operation to cancel a job
}
diff --git
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
index a374edf21d..56f4be73a3 100644
---
a/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
+++
b/server/src/test/java/org/apache/gravitino/server/web/rest/TestJobOperations.java
@@ -25,9 +25,12 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.client.Entity;
@@ -36,23 +39,29 @@ import javax.ws.rs.core.Response;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.dto.job.JobTemplateDTO;
+import org.apache.gravitino.dto.requests.JobRunRequest;
import org.apache.gravitino.dto.requests.JobTemplateRegisterRequest;
import org.apache.gravitino.dto.responses.BaseResponse;
import org.apache.gravitino.dto.responses.DropResponse;
import org.apache.gravitino.dto.responses.ErrorConstants;
import org.apache.gravitino.dto.responses.ErrorResponse;
+import org.apache.gravitino.dto.responses.JobListResponse;
+import org.apache.gravitino.dto.responses.JobResponse;
import org.apache.gravitino.dto.responses.JobTemplateListResponse;
import org.apache.gravitino.dto.responses.JobTemplateResponse;
import org.apache.gravitino.dto.responses.NameListResponse;
import org.apache.gravitino.exceptions.InUseException;
import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
import org.apache.gravitino.exceptions.MetalakeNotInUseException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.job.JobHandle;
import org.apache.gravitino.job.JobOperationDispatcher;
import org.apache.gravitino.job.ShellJobTemplate;
import org.apache.gravitino.job.SparkJobTemplate;
import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobEntity;
import org.apache.gravitino.meta.JobTemplateEntity;
import org.apache.gravitino.rest.RESTUtils;
import org.apache.gravitino.server.web.ObjectMapperProvider;
@@ -489,10 +498,208 @@ public class TestJobOperations extends JerseyTest {
Assertions.assertEquals(InUseException.class.getSimpleName(),
errorResp4.getType());
}
+ @Test
+ public void testListJobs() {
+ String templateName = "shell_template_1";
+ JobEntity job1 = newJobEntity(templateName, JobHandle.Status.QUEUED);
+ JobEntity job2 = newJobEntity(templateName, JobHandle.Status.STARTED);
+ JobEntity job3 = newJobEntity("spark_template_1",
JobHandle.Status.SUCCEEDED);
+
+ when(jobOperationDispatcher.listJobs(metalake, Optional.empty()))
+ .thenReturn(Lists.newArrayList(job1, job2, job3));
+
+ Response resp =
+ target(jobRunPath())
+ .request(APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(APPLICATION_JSON_TYPE, resp.getMediaType());
+
+ JobListResponse jobListResponse = resp.readEntity(JobListResponse.class);
+ Assertions.assertEquals(0, jobListResponse.getCode());
+
+ Assertions.assertEquals(3, jobListResponse.getJobs().size());
+ Assertions.assertEquals(JobOperations.toDTO(job1),
jobListResponse.getJobs().get(0));
+ Assertions.assertEquals(JobOperations.toDTO(job2),
jobListResponse.getJobs().get(1));
+ Assertions.assertEquals(JobOperations.toDTO(job3),
jobListResponse.getJobs().get(2));
+
+ // Test list jobs by template name
+ when(jobOperationDispatcher.listJobs(metalake, Optional.of(templateName)))
+ .thenReturn(Lists.newArrayList(job1, job2));
+
+ Response resp1 =
+ target(jobRunPath())
+ .queryParam("jobTemplateName", templateName)
+ .request(APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp1.getStatus());
+ Assertions.assertEquals(APPLICATION_JSON_TYPE, resp1.getMediaType());
+
+ JobListResponse jobListResponse1 = resp1.readEntity(JobListResponse.class);
+ Assertions.assertEquals(0, jobListResponse1.getCode());
+ Assertions.assertEquals(2, jobListResponse1.getJobs().size());
+ Assertions.assertEquals(JobOperations.toDTO(job1),
jobListResponse1.getJobs().get(0));
+ Assertions.assertEquals(JobOperations.toDTO(job2),
jobListResponse1.getJobs().get(1));
+
+ // Test throw NoSuchMetalakeException
+ doThrow(new NoSuchMetalakeException("mock error"))
+ .when(jobOperationDispatcher)
+ .listJobs(any(), any());
+
+ Response resp2 =
+ target(jobRunPath())
+ .request(APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp2.getStatus());
+
+ ErrorResponse errorResp = resp2.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE,
errorResp.getCode());
+ Assertions.assertEquals(NoSuchMetalakeException.class.getSimpleName(),
errorResp.getType());
+
+ // Test throw MetalakeNotInUseException
+ doThrow(new MetalakeNotInUseException("mock error"))
+ .when(jobOperationDispatcher)
+ .listJobs(any(), any());
+
+ Response resp3 =
+ target(jobRunPath())
+ .request(APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(Response.Status.CONFLICT.getStatusCode(),
resp3.getStatus());
+
+ ErrorResponse errorResp2 = resp3.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.NOT_IN_USE_CODE,
errorResp2.getCode());
+ Assertions.assertEquals(MetalakeNotInUseException.class.getSimpleName(),
errorResp2.getType());
+
+ // Test NoSuchJobTemplateException
+ doThrow(new NoSuchJobTemplateException("mock error"))
+ .when(jobOperationDispatcher)
+ .listJobs(any(), any());
+
+ Response resp4 =
+ target(jobRunPath())
+ .request(APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp4.getStatus());
+
+ ErrorResponse errorResp3 = resp4.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE,
errorResp3.getCode());
+ Assertions.assertEquals(NoSuchJobTemplateException.class.getSimpleName(),
errorResp3.getType());
+
+ // Test throw RuntimeException
+ doThrow(new RuntimeException("mock
error")).when(jobOperationDispatcher).listJobs(any(), any());
+
+ Response resp5 =
+ target(jobRunPath())
+ .request(APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .get();
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp5.getStatus());
+
+ ErrorResponse errorResp4 = resp5.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.INTERNAL_ERROR_CODE,
errorResp4.getCode());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp4.getType());
+ }
+
+ @Test
+ public void testRunJob() {
+ String templateName = "shell_template_1";
+ Map<String, String> jobConf = ImmutableMap.of("key1", "value1", "key2",
"value2");
+ JobEntity job = newJobEntity(templateName, JobHandle.Status.QUEUED);
+ JobRunRequest req = new JobRunRequest(templateName, jobConf);
+
+ when(jobOperationDispatcher.runJob(metalake, templateName,
jobConf)).thenReturn(job);
+
+ Response resp =
+ target(jobRunPath())
+ .request(APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .post(Entity.entity(req, APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(APPLICATION_JSON_TYPE, resp.getMediaType());
+
+ JobResponse jobResp = resp.readEntity(JobResponse.class);
+ Assertions.assertEquals(0, jobResp.getCode());
+ Assertions.assertEquals(JobOperations.toDTO(job), jobResp.getJob());
+
+ // Test throw NoSuchJobTemplateException
+ doThrow(new NoSuchJobTemplateException("mock error"))
+ .when(jobOperationDispatcher)
+ .runJob(any(), any(), any());
+
+ Response resp2 =
+ target(jobRunPath())
+ .request(APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .post(Entity.entity(req, APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp2.getStatus());
+
+ ErrorResponse errorResp = resp2.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE,
errorResp.getCode());
+ Assertions.assertEquals(NoSuchJobTemplateException.class.getSimpleName(),
errorResp.getType());
+ }
+
+ @Test
+ public void testCancelJob() {
+ JobEntity job = newJobEntity("shell_template_1", JobHandle.Status.STARTED);
+
+ when(jobOperationDispatcher.cancelJob(metalake,
job.name())).thenReturn(job);
+
+ Response resp =
+ target(jobRunPath())
+ .path(job.name())
+ .request(APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .post(null);
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(APPLICATION_JSON_TYPE, resp.getMediaType());
+
+ JobResponse jobResp = resp.readEntity(JobResponse.class);
+ Assertions.assertEquals(0, jobResp.getCode());
+ Assertions.assertEquals(JobOperations.toDTO(job), jobResp.getJob());
+
+ // Test throw NoSuchJobException
+ doThrow(new NoSuchJobException("mock error"))
+ .when(jobOperationDispatcher)
+ .cancelJob(any(), any());
+
+ Response resp2 =
+ target(jobRunPath())
+ .path(job.name())
+ .request(APPLICATION_JSON_TYPE)
+ .accept("application/vnd.gravitino.v1+json")
+ .post(null);
+
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp2.getStatus());
+
+ ErrorResponse errorResp = resp2.readEntity(ErrorResponse.class);
+ Assertions.assertEquals(ErrorConstants.NOT_FOUND_CODE,
errorResp.getCode());
+ Assertions.assertEquals(NoSuchJobException.class.getSimpleName(),
errorResp.getType());
+ }
+
private String jobTemplatePath() {
return "/metalakes/" + metalake + "/jobs/templates";
}
+ private String jobRunPath() {
+ return "/metalakes/" + metalake + "/jobs/runs";
+ }
+
private JobTemplateEntity newShellJobTemplateEntity(String name, String
comment) {
ShellJobTemplate shellJobTemplate =
ShellJobTemplate.builder()
@@ -529,4 +736,17 @@ public class TestJobOperations extends JerseyTest {
.withAuditInfo(auditInfo)
.build();
}
+
+ private JobEntity newJobEntity(String templateName, JobHandle.Status status)
{
+ Random rand = new Random();
+ return JobEntity.builder()
+ .withId(rand.nextLong())
+ .withJobExecutionId(rand.nextLong() + "")
+ .withNamespace(NamespaceUtil.ofJob(metalake))
+ .withJobTemplateName(templateName)
+ .withStatus(status)
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+ }
}