This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new daaa14b591 [#7504] feat(core): Add the job framework for Job System
(part-1) (#7695)
daaa14b591 is described below
commit daaa14b59161347f340ec18b8f0dcb45094d96c4
Author: Jerry Shao <[email protected]>
AuthorDate: Mon Jul 21 15:17:31 2025 +0800
[#7504] feat(core): Add the job framework for Job System (part-1) (#7695)
### What changes were proposed in this pull request?
This is the first part of work to add the job framework for Job System
in Gravitino.
### Why are the changes needed?
Fix: #7504
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add new UTs to cover the code.
---
.../java/org/apache/gravitino/job/JobHandle.java | 3 +
.../java/org/apache/gravitino/job/JobTemplate.java | 5 +
.../org/apache/gravitino/job/ShellJobTemplate.java | 5 +
.../org/apache/gravitino/job/SparkJobTemplate.java | 15 +
.../main/java/org/apache/gravitino/Configs.java | 7 +
.../src/main/java/org/apache/gravitino/Entity.java | 41 +-
.../gravitino/SupportsRelationOperations.java | 4 +-
.../java/org/apache/gravitino/job/JobManager.java | 233 +++++++++++
.../gravitino/job/JobOperationDispatcher.java | 101 +++++
.../java/org/apache/gravitino/meta/JobEntity.java | 161 ++++++++
.../apache/gravitino/meta/JobTemplateEntity.java | 239 +++++++++++
.../gravitino/storage/relational/JDBCBackend.java | 8 +
.../apache/gravitino/utils/NameIdentifierUtil.java | 22 ++
.../org/apache/gravitino/utils/NamespaceUtil.java | 21 +
.../org/apache/gravitino/job/TestJobManager.java | 438 +++++++++++++++++++++
15 files changed, 1267 insertions(+), 36 deletions(-)
diff --git a/api/src/main/java/org/apache/gravitino/job/JobHandle.java
b/api/src/main/java/org/apache/gravitino/job/JobHandle.java
index 815e72ea48..38927538de 100644
--- a/api/src/main/java/org/apache/gravitino/job/JobHandle.java
+++ b/api/src/main/java/org/apache/gravitino/job/JobHandle.java
@@ -24,6 +24,9 @@ package org.apache.gravitino.job;
*/
public interface JobHandle {
+ /** The prefix for job IDs, every job ID returned from Gravitino will be
like job-uuid */
+ String JOB_ID_PREFIX = "job-";
+
/** The status of the job. */
enum Status {
diff --git a/api/src/main/java/org/apache/gravitino/job/JobTemplate.java
b/api/src/main/java/org/apache/gravitino/job/JobTemplate.java
index 51714b6784..fa6d89f211 100644
--- a/api/src/main/java/org/apache/gravitino/job/JobTemplate.java
+++ b/api/src/main/java/org/apache/gravitino/job/JobTemplate.java
@@ -41,6 +41,11 @@ import org.apache.commons.lang3.StringUtils;
* <li>arguments
* <li>environments
* </ul>
+ *
+ * <p>executable is the path to the executable that will be run, it should be
an absolute path that
+ * can be accessed by the Gravitino server, current Gravitino can support
executables in the local
+ * file system, or on the web server (e.g., HTTP or HTTPS, FTP). Distributed
file systems like HDFS
+ * or S3 will be supported in the future.
*/
public abstract class JobTemplate {
diff --git a/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java
b/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java
index 566d2b74eb..afb6fdd07d 100644
--- a/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java
+++ b/api/src/main/java/org/apache/gravitino/job/ShellJobTemplate.java
@@ -26,6 +26,11 @@ import java.util.Objects;
* Represents a job template for executing shell scripts. This class extends
the JobTemplate class
* and provides functionality specific to shell job templates, including a
list of scripts to be
* executed.
+ *
+ * <p>Scripts are a list of shell files that will be leveraged by the
"executable". Scripts must be
+ * put in the place where Gravitino server can access them. Current Gravitino
can support scripts in
+ * the local file system, or on the web server (e.g., HTTP, HTTPS, FTP).
Distributed file systems
+ * like HDFS or S3 will be supported in the future.
*/
public class ShellJobTemplate extends JobTemplate {
diff --git a/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
b/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
index c70d193ede..a1f68f7e75 100644
--- a/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
+++ b/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
@@ -30,6 +30,21 @@ import org.apache.commons.lang3.StringUtils;
* Represents a job template for executing Spark applications. This class
extends the JobTemplate
* class and provides functionality specific to Spark job templates, including
the class name, jars,
* files, archives, and configurations required for the Spark job.
+ *
+ * <p>Take Spark word count job as an example:
+ *
+ * <p>className: "org.apache.spark.examples.JavaWordCount" executable
+ * "https://example.com/spark-examples.jar" arguments: ["{{input_path}}",
"{{output_path}}"]
+ * configs: {"spark.master": "local[*]", "spark.app.name": "WordCount"}
+ *
+ * <p>configs is a map of configuration parameters that will be used by the
Spark application. It
+ * can be templated by using placeholders like "{{foo_value}}" and
"{{bar_value}}". These
+ * placeholders will be replaced with actual values when the job is executed.
+ *
+ * <p>jars, files, and archives are lists of resources that will be used by
the Spark application .
+ * These resources must be accessible to the Gravitino server, and can be
located in the local file
+ * system, on a web server (e.g., HTTP, HTTPS, FTP). Distributed file systems
like HDFS or S3 will
+ * be supported in the future.
*/
public class SparkJobTemplate extends JobTemplate {
diff --git a/core/src/main/java/org/apache/gravitino/Configs.java
b/core/src/main/java/org/apache/gravitino/Configs.java
index 24d445ccb7..5a6544b1d4 100644
--- a/core/src/main/java/org/apache/gravitino/Configs.java
+++ b/core/src/main/java/org/apache/gravitino/Configs.java
@@ -406,4 +406,11 @@ public class Configs {
.stringConf()
.checkValue(StringUtils::isNotBlank,
ConfigConstants.NOT_BLANK_ERROR_MSG)
.createWithDefault("caffeine");
+
+ public static final ConfigEntry<String> JOB_STAGING_DIR =
+ new ConfigBuilder("gravitino.job.stagingDir")
+ .doc("Directory for managing staging files when running jobs.")
+ .version(ConfigConstants.VERSION_1_0_0)
+ .stringConf()
+ .createWithDefault("/tmp/gravitino/jobs/staging");
}
diff --git a/core/src/main/java/org/apache/gravitino/Entity.java
b/core/src/main/java/org/apache/gravitino/Entity.java
index 355455c7b1..c8d973f8c5 100644
--- a/core/src/main/java/org/apache/gravitino/Entity.java
+++ b/core/src/main/java/org/apache/gravitino/Entity.java
@@ -18,9 +18,7 @@
*/
package org.apache.gravitino;
-import com.google.common.collect.ImmutableList;
import java.io.Serializable;
-import java.util.List;
import java.util.Map;
import lombok.Getter;
@@ -58,6 +56,10 @@ public interface Entity extends Serializable {
/** The policy schema name in the system catalog. */
String POLICY_SCHEMA_NAME = "policy";
+ String JOB_TEMPLATE_SCHEMA_NAME = "job_template";
+
+ String JOB_SCHEMA_NAME = "job";
+
/** Enumeration defining the types of entities in the Gravitino framework. */
@Getter
enum EntityType {
@@ -75,40 +77,9 @@ public interface Entity extends Serializable {
MODEL,
MODEL_VERSION,
POLICY,
+ JOB_TEMPLATE,
+ JOB,
AUDIT;
-
- /**
- * Returns the parent entity types of the given entity type. The parent
entity types are the
- * entity types that are higher in the hierarchy than the given entity
type. For example, the
- * parent entity types of a table are metalake, catalog, and schema.
(Sequence: root to leaf)
- *
- * @param entityType The entity type for which to get the parent entity
types.
- * @return The parent entity types of the given entity type.
- */
- public static List<EntityType> getParentEntityTypes(EntityType entityType)
{
- switch (entityType) {
- case METALAKE:
- return ImmutableList.of();
- case CATALOG:
- return ImmutableList.of(METALAKE);
- case SCHEMA:
- return ImmutableList.of(METALAKE, CATALOG);
- case TABLE:
- case FILESET:
- case TOPIC:
- case MODEL:
- case USER:
- case GROUP:
- case ROLE:
- return ImmutableList.of(METALAKE, CATALOG, SCHEMA);
- case COLUMN:
- return ImmutableList.of(METALAKE, CATALOG, SCHEMA, TABLE);
- case MODEL_VERSION:
- return ImmutableList.of(METALAKE, CATALOG, SCHEMA, MODEL);
- default:
- throw new IllegalArgumentException("Unknown entity type: " +
entityType);
- }
- }
}
/**
diff --git
a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
index dad992412d..22bc08acd5 100644
--- a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
+++ b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
@@ -36,7 +36,9 @@ public interface SupportsRelationOperations {
/** Role and user relationship */
ROLE_USER_REL,
/** Role and group relationship */
- ROLE_GROUP_REL
+ ROLE_GROUP_REL,
+ /** Job template and job relationship */
+ JOB_TEMPLATE_JOB_REL
}
/**
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
new file mode 100644
index 0000000000..09c36cf22f
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.job;
+
+import static org.apache.gravitino.metalake.MetalakeManager.checkMetalake;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
+import org.apache.gravitino.lock.LockType;
+import org.apache.gravitino.lock.TreeLockUtils;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+
+public class JobManager implements JobOperationDispatcher, Closeable {
+
+ private final EntityStore entityStore;
+
+ private final File stagingDir;
+
+ public JobManager(Config config, EntityStore entityStore) {
+ this.entityStore = entityStore;
+
+ String stagingDirPath = config.get(Configs.JOB_STAGING_DIR);
+ this.stagingDir = new File(stagingDirPath);
+ if (stagingDir.exists()) {
+ if (!stagingDir.isDirectory()) {
+ throw new IllegalArgumentException(
+ String.format("Staging directory %s exists but is not a
directory", stagingDirPath));
+ }
+
+ if (!(stagingDir.canExecute() && stagingDir.canRead() &&
stagingDir.canWrite())) {
+ throw new IllegalArgumentException(
+ String.format("Staging directory %s is not accessible",
stagingDirPath));
+ }
+ } else {
+ if (!stagingDir.mkdirs()) {
+ throw new IllegalArgumentException(
+ String.format("Failed to create staging directory %s",
stagingDirPath));
+ }
+ }
+ }
+
+ @Override
+ public List<JobTemplateEntity> listJobTemplates(String metalake) {
+ checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+ Namespace jobTemplateNs = NamespaceUtil.ofJobTemplate(metalake);
+ return TreeLockUtils.doWithTreeLock(
+ NameIdentifier.of(jobTemplateNs.levels()),
+ LockType.READ,
+ () -> {
+ try {
+ return entityStore.list(
+ jobTemplateNs, JobTemplateEntity.class,
Entity.EntityType.JOB_TEMPLATE);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ });
+ }
+
+ @Override
+ public void registerJobTemplate(String metalake, JobTemplateEntity
jobTemplateEntity)
+ throws JobTemplateAlreadyExistsException {
+ checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+ NameIdentifier jobTemplateIdent =
+ NameIdentifierUtil.ofJobTemplate(metalake, jobTemplateEntity.name());
+ TreeLockUtils.doWithTreeLock(
+ jobTemplateIdent,
+ LockType.WRITE,
+ () -> {
+ try {
+ entityStore.put(jobTemplateEntity, false /* overwrite */);
+ return null;
+ } catch (EntityAlreadyExistsException e) {
+ throw new JobTemplateAlreadyExistsException(
+ "Job template with name %s under metalake %s already exists",
+ jobTemplateEntity.name(), metalake);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ });
+ }
+
+ @Override
+ public JobTemplateEntity getJobTemplate(String metalake, String
jobTemplateName)
+ throws NoSuchJobTemplateException {
+ checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+ NameIdentifier jobTemplateIdent =
NameIdentifierUtil.ofJobTemplate(metalake, jobTemplateName);
+ return TreeLockUtils.doWithTreeLock(
+ jobTemplateIdent,
+ LockType.READ,
+ () -> {
+ try {
+ return entityStore.get(
+ jobTemplateIdent, Entity.EntityType.JOB_TEMPLATE,
JobTemplateEntity.class);
+ } catch (NoSuchEntityException e) {
+ throw new NoSuchJobTemplateException(
+ "Job template with name %s under metalake %s does not exist",
+ jobTemplateName, metalake);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ });
+ }
+
+ @Override
+ public boolean deleteJobTemplate(String metalake, String jobTemplateName)
throws InUseException {
+ checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+ // TODO. Check if there are any running jobs associated with the job
template. If there are
+ // running jobs, throw InUseException
+
+ // Delete the job template entity as well as all the jobs associated with
it.
+ return TreeLockUtils.doWithTreeLock(
+ NameIdentifier.of(NamespaceUtil.ofJobTemplate(metalake).levels()),
+ LockType.WRITE,
+ () -> {
+ try {
+ return entityStore.delete(
+ NameIdentifierUtil.ofJobTemplate(metalake, jobTemplateName),
+ Entity.EntityType.JOB_TEMPLATE);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ });
+ }
+
+ @Override
+ public List<JobEntity> listJobs(String metalake, Optional<String>
jobTemplateName)
+ throws NoSuchJobTemplateException {
+ checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+ Namespace jobNs = NamespaceUtil.ofJob(metalake);
+ return TreeLockUtils.doWithTreeLock(
+ NameIdentifier.of(jobNs.levels()),
+ LockType.READ,
+ () -> {
+ try {
+ // If jobTemplateName is present, check if the job template
exists, will throw an
+ // exception if the job template does not exist.
+ jobTemplateName.ifPresent(s -> getJobTemplate(metalake, s));
+
+ List<JobEntity> jobEntities;
+ if (jobTemplateName.isPresent()) {
+ NameIdentifier jobTemplateIdent =
+ NameIdentifierUtil.ofJobTemplate(metalake,
jobTemplateName.get());
+
+ // Lock the job template to ensure no concurrent
modifications/deletions
+ jobEntities =
+ TreeLockUtils.doWithTreeLock(
+ jobTemplateIdent,
+ LockType.READ,
+ () ->
+ // List all the jobs associated with the job template
+ entityStore
+ .relationOperations()
+ .listEntitiesByRelation(
+
SupportsRelationOperations.Type.JOB_TEMPLATE_JOB_REL,
+ jobTemplateIdent,
+ Entity.EntityType.JOB_TEMPLATE));
+ } else {
+ jobEntities = entityStore.list(jobNs, JobEntity.class,
Entity.EntityType.JOB);
+ }
+ return jobEntities;
+
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ });
+ }
+
+ @Override
+ public JobEntity getJob(String metalake, String jobId) throws
NoSuchJobException {
+ checkMetalake(NameIdentifierUtil.ofMetalake(metalake), entityStore);
+
+ NameIdentifier jobIdent = NameIdentifierUtil.ofJob(metalake, jobId);
+ return TreeLockUtils.doWithTreeLock(
+ jobIdent,
+ LockType.READ,
+ () -> {
+ try {
+ return entityStore.get(jobIdent, Entity.EntityType.JOB,
JobEntity.class);
+ } catch (NoSuchEntityException e) {
+ throw new NoSuchJobException(
+ "Job with ID %s under metalake %s does not exist", jobId,
metalake);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ });
+ }
+
+ @Override
+ public void close() throws IOException {
+ // TODO. Implement any necessary cleanup logic for the JobManager.
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
new file mode 100644
index 0000000000..42f0d75e05
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/job/JobOperationDispatcher.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.job;
+
+import java.util.List;
+import java.util.Optional;
+import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+
+/** The interface for job operation dispatcher. */
+public interface JobOperationDispatcher {
+
+ /**
+ * Lists all the job templates in the specified metalake.
+ *
+ * @param metalake the name of the metalake
+ * @return a list of job templates
+ */
+ List<JobTemplateEntity> listJobTemplates(String metalake);
+
+ /**
+ * Registers a job template in the specified metalake.
+ *
+ * @param metalake the name of the metalake
+ * @param jobTemplateEntity the job template entity to register
+ * @throws JobTemplateAlreadyExistsException if a job template with the same
name already exists
+ */
+ void registerJobTemplate(String metalake, JobTemplateEntity
jobTemplateEntity)
+ throws JobTemplateAlreadyExistsException;
+
+ /**
+ * Retrieves a job template by its name in the specified metalake.
+ *
+ * @param metalake the name of the metalake
+ * @param jobTemplateName the name of the job template to retrieve
+ * @return the job template entity associated with the specified name
+ * @throws NoSuchJobTemplateException if no job template with the specified
name exists
+ */
+ JobTemplateEntity getJobTemplate(String metalake, String jobTemplateName)
+ throws NoSuchJobTemplateException;
+
+ /**
+ * Deletes a job template by its name in the specified metalake.
+ *
+ * @param metalake the name of the metalake
+ * @param jobTemplateName the name of the job template to delete
+ * @return true if the job template was successfully deleted, false if the
job template does not
+ * exist
+ * @throws InUseException if there are still queued or started jobs
associated with the job
+ */
+ boolean deleteJobTemplate(String metalake, String jobTemplateName) throws
InUseException;
+
+ /**
+ * List all the jobs. If the jobTemplateName is provided, it will list the
jobs associated with
+ * that job template, if not, it will list all the jobs in the specified
metalake.
+ *
+ * @param metalake the name of the metalake
+ * @param jobTemplateName the name of the job template to filter jobs by, if
present
+ * @return a list of job entities
+ * @throws NoSuchJobTemplateException if the job template does not exist
+ */
+ List<JobEntity> listJobs(String metalake, Optional<String> jobTemplateName)
+ throws NoSuchJobTemplateException;
+
+ /**
+ * Retrieves a job by its ID in the specified metalake.
+ *
+ * @param metalake the name of the metalake
+ * @param jobId the ID of the job to retrieve
+ * @return the job entity associated with the specified ID
+ * @throws NoSuchJobException if no job with the specified ID exists
+ */
+ JobEntity getJob(String metalake, String jobId) throws NoSuchJobException;
+
+ // TODO. Implement the runJob and cancelJob methods in another PR.
+ // JobEntity runJob(String metalake, String jobTemplateName, Map<String,
String> jobConf)
+ // throws NoSuchJobTemplateException;
+ //
+ // JobEntity cancelJob(String metalake, String jobId) throws
NoSuchJobException;
+}
diff --git a/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
b/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
new file mode 100644
index 0000000000..4083154685
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/meta/JobEntity.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.meta;
+
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import lombok.ToString;
+import org.apache.gravitino.Auditable;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.Field;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.job.JobHandle;
+
+@ToString
+public class JobEntity implements Entity, Auditable, HasIdentifier {
+
+ public static final Field ID =
+ Field.required("id", Long.class, "The unique id of the job entity.");
+ public static final Field STATUS =
+ Field.required("status", JobHandle.Status.class, "The status of the
job.");
+ public static final Field TEMPLATE_NAME =
+ Field.required("job_template_name", String.class, "The name of the job
template.");
+ public static final Field AUDIT_INFO =
+ Field.required(
+ "audit_info", AuditInfo.class, "The audit details of the job
template entity.");
+
+ private Long id;
+ private JobHandle.Status status;
+ private String jobTemplateName;
+ private Namespace namespace;
+ private AuditInfo auditInfo;
+
+ private JobEntity() {}
+
+ @Override
+ public Map<Field, Object> fields() {
+ Map<Field, Object> fields = Maps.newHashMap();
+ fields.put(ID, id);
+ fields.put(TEMPLATE_NAME, jobTemplateName);
+ fields.put(STATUS, status);
+ fields.put(AUDIT_INFO, auditInfo);
+ return Collections.unmodifiableMap(fields);
+ }
+
+ @Override
+ public Long id() {
+ return id;
+ }
+
+ @Override
+ public String name() {
+ return JobHandle.JOB_ID_PREFIX + id;
+ }
+
+ @Override
+ public Namespace namespace() {
+ return namespace;
+ }
+
+ public JobHandle.Status status() {
+ return status;
+ }
+
+ public String jobTemplateName() {
+ return jobTemplateName;
+ }
+
+ @Override
+ public AuditInfo auditInfo() {
+ return auditInfo;
+ }
+
+ @Override
+ public EntityType type() {
+ return EntityType.JOB;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof JobEntity)) {
+ return false;
+ }
+
+ JobEntity that = (JobEntity) o;
+ return Objects.equals(id, that.id)
+ && Objects.equals(status, that.status)
+ && Objects.equals(jobTemplateName, that.jobTemplateName)
+ && Objects.equals(namespace, that.namespace)
+ && Objects.equals(auditInfo, that.auditInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, namespace, status, jobTemplateName, auditInfo);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private final JobEntity jobEntity;
+
+ private Builder() {
+ this.jobEntity = new JobEntity();
+ }
+
+ public Builder withId(Long id) {
+ jobEntity.id = id;
+ return this;
+ }
+
+ public Builder withNamespace(Namespace namespace) {
+ jobEntity.namespace = namespace;
+ return this;
+ }
+
+ public Builder withStatus(JobHandle.Status status) {
+ jobEntity.status = status;
+ return this;
+ }
+
+ public Builder withJobTemplateName(String jobTemplateName) {
+ jobEntity.jobTemplateName = jobTemplateName;
+ return this;
+ }
+
+ public Builder withAuditInfo(AuditInfo auditInfo) {
+ jobEntity.auditInfo = auditInfo;
+ return this;
+ }
+
+ public JobEntity build() {
+ jobEntity.validate();
+ return jobEntity;
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/meta/JobTemplateEntity.java
b/core/src/main/java/org/apache/gravitino/meta/JobTemplateEntity.java
new file mode 100644
index 0000000000..de030c5d50
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/meta/JobTemplateEntity.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.meta;
+
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.gravitino.Auditable;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.Field;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.ShellJobTemplate;
+import org.apache.gravitino.job.SparkJobTemplate;
+
+@ToString
+public class JobTemplateEntity implements Entity, Auditable, HasIdentifier {
+
+ /**
+ * A job template content that will be used internally to maintain all the
information of a job
+ * template. this class will be serialized and stored in the entity store.
Internally, we don't
+ * separate the different types of job templates, so this class will be used
to represent all the
+ * job templates.
+ */
+ @Getter()
+ @Accessors(fluent = true)
+ @ToString
+ @NoArgsConstructor
+ @AllArgsConstructor(access = lombok.AccessLevel.PRIVATE)
+ @EqualsAndHashCode
+ @lombok.Builder(setterPrefix = "with")
+ public static class TemplateContent {
+ // Generic fields for job template content
+ private JobTemplate.JobType jobType;
+ private String executable;
+ private List<String> arguments;
+ private Map<String, String> environments;
+ private Map<String, String> customFields;
+ // Fields for Shell job template content
+ private List<String> scripts;
+ // Fields for Spark job template content
+ private String className;
+ private List<String> jars;
+ private List<String> files;
+ private List<String> archives;
+ private Map<String, String> configs;
+
+ public static TemplateContent fromJobTemplate(JobTemplate jobTemplate) {
+ TemplateContentBuilder builder =
+ TemplateContent.builder()
+ .withJobType(jobTemplate.jobType())
+ .withExecutable(jobTemplate.executable())
+ .withArguments(jobTemplate.arguments())
+ .withEnvironments(jobTemplate.environments())
+ .withCustomFields(jobTemplate.customFields());
+
+ if (jobTemplate instanceof ShellJobTemplate) {
+ ShellJobTemplate shellJobTemplate = (ShellJobTemplate) jobTemplate;
+ builder.withScripts(shellJobTemplate.scripts());
+
+ } else if (jobTemplate instanceof SparkJobTemplate) {
+ SparkJobTemplate sparkJobTemplate = (SparkJobTemplate) jobTemplate;
+ builder
+ .withClassName(sparkJobTemplate.className())
+ .withJars(sparkJobTemplate.jars())
+ .withFiles(sparkJobTemplate.files())
+ .withArchives(sparkJobTemplate.archives())
+ .withConfigs(sparkJobTemplate.configs());
+ }
+
+ return builder.build();
+ }
+ }
+
+ public static final Field ID =
+ Field.required("id", Long.class, "The unique id of the job template
entity.");
+ public static final Field NAME =
+ Field.required("name", String.class, "The name of the job template
entity.");
+ public static final Field COMMENT =
+ Field.optional(
+ "comment", String.class, "The comment or description of the job
template entity.");
+ public static final Field TEMPLATE_CONTENT =
+ Field.required(
+ "template_content",
+ TemplateContent.class,
+ "The template content of the job template entity.");
+ public static final Field AUDIT_INFO =
+ Field.required(
+ "audit_info", AuditInfo.class, "The audit details of the job
template entity.");
+
+ private Long id;
+ private String name;
+ private Namespace namespace;
+ private String comment;
+ private TemplateContent templateContent;
+ private AuditInfo auditInfo;
+
+ private JobTemplateEntity() {}
+
+ @Override
+ public Map<Field, Object> fields() {
+ Map<Field, Object> fields = Maps.newHashMap();
+ fields.put(ID, id);
+ fields.put(NAME, name);
+ fields.put(COMMENT, comment);
+ fields.put(TEMPLATE_CONTENT, templateContent);
+ fields.put(AUDIT_INFO, auditInfo);
+ return Collections.unmodifiableMap(fields);
+ }
+
+ @Override
+ public Long id() {
+ return id;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Namespace namespace() {
+ return namespace;
+ }
+
+ public String comment() {
+ return comment;
+ }
+
+ public TemplateContent templateContent() {
+ return templateContent;
+ }
+
+ @Override
+ public AuditInfo auditInfo() {
+ return auditInfo;
+ }
+
+ @Override
+ public EntityType type() {
+ return EntityType.JOB_TEMPLATE;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof JobTemplateEntity)) {
+ return false;
+ }
+
+ JobTemplateEntity that = (JobTemplateEntity) o;
+ return Objects.equals(id, that.id)
+ && Objects.equals(name, that.name)
+ && Objects.equals(namespace, that.namespace)
+ && Objects.equals(comment, that.comment)
+ && Objects.equals(templateContent, that.templateContent)
+ && Objects.equals(auditInfo, that.auditInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, name, namespace, comment, templateContent,
auditInfo);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private final JobTemplateEntity jobTemplateEntity;
+
+ private Builder() {
+ this.jobTemplateEntity = new JobTemplateEntity();
+ }
+
+ public Builder withId(Long id) {
+ jobTemplateEntity.id = id;
+ return this;
+ }
+
+ public Builder withName(String name) {
+ jobTemplateEntity.name = name;
+ return this;
+ }
+
+ public Builder withNamespace(Namespace namespace) {
+ jobTemplateEntity.namespace = namespace;
+ return this;
+ }
+
+ public Builder withComment(String comment) {
+ jobTemplateEntity.comment = comment;
+ return this;
+ }
+
+ public Builder withTemplateContent(TemplateContent templateContent) {
+ jobTemplateEntity.templateContent = templateContent;
+ return this;
+ }
+
+ public Builder withAuditInfo(AuditInfo auditInfo) {
+ jobTemplateEntity.auditInfo = auditInfo;
+ return this;
+ }
+
+ public JobTemplateEntity build() {
+ jobTemplateEntity.validate();
+ return jobTemplateEntity;
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index a51e306310..966b4e3250 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -353,6 +353,12 @@ public class JDBCBackend implements RelationalBackend {
return ModelVersionMetaService.getInstance()
.deleteModelVersionMetasByLegacyTimeline(
legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
+ case JOB_TEMPLATE:
+ // TODO: Implement hard delete logic for job templates.
+ return 0;
+ case JOB:
+ // TODO: Implement hard delete logic for jobs.
+ return 0;
case AUDIT:
return 0;
// TODO: Implement hard delete logic for these entity types.
@@ -380,6 +386,8 @@ public class JDBCBackend implements RelationalBackend {
case TAG:
case MODEL:
case MODEL_VERSION:
+ case JOB_TEMPLATE:
+ case JOB:
// These entity types have not implemented multi-versions, so we can
skip.
return 0;
diff --git
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index e556cbce9f..10d7ec4f13 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -279,6 +279,28 @@ public class NameIdentifierUtil {
alias);
}
+ /**
+ * Create the job template {@link NameIdentifier} with the given metalake
and job template name.
+ *
+ * @param metalake The metalake name
+ * @param jobTemplateName The job template name
+ * @return The created job template {@link NameIdentifier}
+ */
+ public static NameIdentifier ofJobTemplate(String metalake, String
jobTemplateName) {
+ return NameIdentifier.of(NamespaceUtil.ofJobTemplate(metalake),
jobTemplateName);
+ }
+
+ /**
+ * Create the job {@link NameIdentifier} with the given metalake and job
name.
+ *
+ * @param metalake The metalake name
+ * @param jobName The job name
+ * @return The created job {@link NameIdentifier}
+ */
+ public static NameIdentifier ofJob(String metalake, String jobName) {
+ return NameIdentifier.of(NamespaceUtil.ofJob(metalake), jobName);
+ }
+
/**
* Try to get the catalog {@link NameIdentifier} from the given {@link
NameIdentifier}.
*
diff --git a/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
b/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
index 63743c635b..c294a90145 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NamespaceUtil.java
@@ -185,6 +185,27 @@ public class NamespaceUtil {
return Namespace.of(metalake, catalog, schema, model);
}
+ /**
+ * Create a namespace for job template.
+ *
+ * @param metalake The metalake name
+ * @return A namespace for job template
+ */
+ public static Namespace ofJobTemplate(String metalake) {
+ return Namespace.of(
+ metalake, Entity.SYSTEM_CATALOG_RESERVED_NAME,
Entity.JOB_TEMPLATE_SCHEMA_NAME);
+ }
+
+ /**
+ * Create a namespace for job.
+ *
+ * @param metalake The metalake name
+ * @return A namespace for job
+ */
+ public static Namespace ofJob(String metalake) {
+ return Namespace.of(metalake, Entity.SYSTEM_CATALOG_RESERVED_NAME,
Entity.JOB_SCHEMA_NAME);
+ }
+
/**
* Convert a model name identifier to a model version namespace.
*
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
new file mode 100644
index 0000000000..079f84aec3
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job;
+
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.SupportsRelationOperations;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.MetalakeInUseException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.lock.LockManager;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.metalake.MetalakeManager;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+public class TestJobManager {
+
+ private static JobManager jobManager;
+
+ private static EntityStore entityStore;
+
+ private static Config config;
+
+ private static String testStagingDir;
+
+ private static String metalake = "test_metalake";
+
+ private static NameIdentifier metalakeIdent = NameIdentifier.of(metalake);
+
+ private static MockedStatic<MetalakeManager> mockedMetalake;
+
+ @BeforeAll
+ public static void setUp() throws IllegalAccessException {
+ config = new Config(false) {};
+ Random rand = new Random();
+ testStagingDir = "test_staging_dir_" + rand.nextInt(1000);
+ config.set(Configs.JOB_STAGING_DIR, testStagingDir);
+
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new
LockManager(config), true);
+ entityStore = Mockito.mock(EntityStore.class);
+
+ JobManager jm = new JobManager(config, entityStore);
+ jobManager = Mockito.spy(jm);
+ mockedMetalake = mockStatic(MetalakeManager.class);
+ }
+
+ @AfterAll
+ public static void tearDown() throws Exception {
+ // Clean up resources if necessary
+ jobManager.close();
+ FileUtils.deleteDirectory(new File(testStagingDir));
+ mockedMetalake.close();
+ }
+
+ @AfterEach
+ public void reset() {
+ // Reset the mocked static methods after each test
+ mockedMetalake.reset();
+ Mockito.reset(entityStore);
+ }
+
+ @Test
+ public void testListJobTemplates() throws IOException {
+ mockedMetalake
+ .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+ .thenAnswer(a -> null);
+ JobTemplateEntity shellJobTemplate =
+ newShellJobTemplateEntity("shell_job", "A shell job template");
+ JobTemplateEntity sparkJobTemplate =
+ newSparkJobTemplateEntity("spark_job", "A spark job template");
+
+ when(entityStore.list(
+ NamespaceUtil.ofJobTemplate(metalake),
+ JobTemplateEntity.class,
+ Entity.EntityType.JOB_TEMPLATE))
+ .thenReturn(Lists.newArrayList(shellJobTemplate, sparkJobTemplate));
+
+ List<JobTemplateEntity> templates = jobManager.listJobTemplates(metalake);
+ Assertions.assertEquals(2, templates.size());
+ Assertions.assertTrue(templates.contains(shellJobTemplate));
+ Assertions.assertTrue(templates.contains(sparkJobTemplate));
+
+ // Throw exception if metalake does not exist
+ mockedMetalake
+ .when(() ->
MetalakeManager.checkMetalake(NameIdentifier.of("non_existent"), entityStore))
+ .thenThrow(new NoSuchMetalakeException("Metalake does not exist"));
+
+ Exception e =
+ Assertions.assertThrows(
+ NoSuchMetalakeException.class, () ->
jobManager.listJobTemplates("non_existent"));
+ Assertions.assertEquals("Metalake does not exist", e.getMessage());
+
+ // Throw exception if metalake is in use
+ mockedMetalake
+ .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+ .thenThrow(new MetalakeInUseException("Metalake is in use"));
+
+ e =
+ Assertions.assertThrows(
+ MetalakeInUseException.class, () ->
jobManager.listJobTemplates(metalake));
+ Assertions.assertEquals("Metalake is in use", e.getMessage());
+
+ // Throw exception if entity store fails
+ doThrow(new IOException("Entity store error"))
+ .when(entityStore)
+ .list(
+ NamespaceUtil.ofJobTemplate(metalake),
+ JobTemplateEntity.class,
+ Entity.EntityType.JOB_TEMPLATE);
+
+ Assertions.assertThrows(RuntimeException.class, () ->
jobManager.listJobTemplates(metalake));
+ }
+
+ @Test
+ public void testRegisterJobTemplate() throws IOException {
+ mockedMetalake
+ .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+ .thenAnswer(a -> null);
+
+ JobTemplateEntity shellJobTemplate =
+ newShellJobTemplateEntity("shell_job", "A shell job template");
+ doNothing().when(entityStore).put(shellJobTemplate, false);
+
+ // Register a new job template
+ Assertions.assertDoesNotThrow(() ->
jobManager.registerJobTemplate(metalake, shellJobTemplate));
+
+ // Throw exception if job template already exists
+ doThrow(new EntityAlreadyExistsException("Job template already exists"))
+ .when(entityStore)
+ .put(shellJobTemplate, false /* overwrite */);
+
+ Exception e =
+ Assertions.assertThrows(
+ JobTemplateAlreadyExistsException.class,
+ () -> jobManager.registerJobTemplate(metalake, shellJobTemplate));
+ Assertions.assertEquals(
+ "Job template with name shell_job under metalake test_metalake already
exists",
+ e.getMessage());
+
+ // Throw exception if metalake does not exist
+ mockedMetalake
+ .when(() ->
MetalakeManager.checkMetalake(NameIdentifier.of("non_existent"), entityStore))
+ .thenThrow(new NoSuchMetalakeException("Metalake does not exist"));
+
+ e =
+ Assertions.assertThrows(
+ NoSuchMetalakeException.class,
+ () -> jobManager.registerJobTemplate("non_existent",
shellJobTemplate));
+ Assertions.assertEquals("Metalake does not exist", e.getMessage());
+
+ // Throw exception if metalake is in use
+ mockedMetalake
+ .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+ .thenThrow(new MetalakeInUseException("Metalake is in use"));
+
+ e =
+ Assertions.assertThrows(
+ MetalakeInUseException.class,
+ () -> jobManager.registerJobTemplate(metalake, shellJobTemplate));
+ Assertions.assertEquals("Metalake is in use", e.getMessage());
+
+ // Throw exception if entity store fails
+ doThrow(new IOException("Entity store error"))
+ .when(entityStore)
+ .put(shellJobTemplate, false /* overwrite */);
+
+ Assertions.assertThrows(
+ RuntimeException.class, () -> jobManager.registerJobTemplate(metalake,
shellJobTemplate));
+ }
+
+ @Test
+ public void testGetJobTemplate() throws IOException {
+ mockedMetalake
+ .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+ .thenAnswer(a -> null);
+
+ JobTemplateEntity shellJobTemplate =
+ newShellJobTemplateEntity("shell_job", "A shell job template");
+ when(entityStore.get(
+ NameIdentifierUtil.ofJobTemplate(metalake,
shellJobTemplate.name()),
+ Entity.EntityType.JOB_TEMPLATE,
+ JobTemplateEntity.class))
+ .thenReturn(shellJobTemplate);
+
+ // Get an existing job template
+ JobTemplateEntity retrievedTemplate = jobManager.getJobTemplate(metalake,
"shell_job");
+ Assertions.assertEquals(shellJobTemplate, retrievedTemplate);
+
+ // Throw exception if job template does not exist
+ when(entityStore.get(
+ NameIdentifierUtil.ofJobTemplate(metalake, "non_existent"),
+ Entity.EntityType.JOB_TEMPLATE,
+ JobTemplateEntity.class))
+ .thenThrow(new NoSuchEntityException("Job template does not exist"));
+
+ Exception e =
+ Assertions.assertThrows(
+ NoSuchJobTemplateException.class,
+ () -> jobManager.getJobTemplate(metalake, "non_existent"));
+ Assertions.assertEquals(
+ "Job template with name non_existent under metalake test_metalake does
not exist",
+ e.getMessage());
+
+ // Throw exception if entity store fails
+ doThrow(new IOException("Entity store error"))
+ .when(entityStore)
+ .get(
+ NameIdentifierUtil.ofJobTemplate(metalake, "job"),
+ Entity.EntityType.JOB_TEMPLATE,
+ JobTemplateEntity.class);
+ Assertions.assertThrows(
+ RuntimeException.class, () -> jobManager.getJobTemplate(metalake,
"job"));
+ }
+
+ @Test
+ public void testDeleteJobTemplate() throws IOException {
+ mockedMetalake
+ .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+ .thenAnswer(a -> null);
+
+ JobTemplateEntity shellJobTemplate =
+ newShellJobTemplateEntity("shell_job", "A shell job template");
+ doReturn(true)
+ .when(entityStore)
+ .delete(
+ NameIdentifierUtil.ofJobTemplate(metalake,
shellJobTemplate.name()),
+ Entity.EntityType.JOB_TEMPLATE);
+
+ // Delete an existing job template
+ Assertions.assertTrue(() -> jobManager.deleteJobTemplate(metalake,
"shell_job"));
+
+ doReturn(false)
+ .when(entityStore)
+ .delete(
+ NameIdentifierUtil.ofJobTemplate(metalake, "shell_job"),
+ Entity.EntityType.JOB_TEMPLATE);
+
+ // Delete a non-existing job template
+ Assertions.assertFalse(() -> jobManager.deleteJobTemplate(metalake,
"shell_job"));
+
+ // Throw exception if entity store fails
+ doThrow(new IOException("Entity store error"))
+ .when(entityStore)
+ .delete(NameIdentifierUtil.ofJobTemplate(metalake, "job"),
Entity.EntityType.JOB_TEMPLATE);
+ Assertions.assertThrows(
+ RuntimeException.class, () -> jobManager.deleteJobTemplate(metalake,
"job"));
+ }
+
+ @Test
+ public void testListJobs() throws IOException {
+ mockedMetalake
+ .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+ .thenAnswer(a -> null);
+
+ JobTemplateEntity shellJobTemplate =
+ newShellJobTemplateEntity("shell_job", "A shell job template");
+ when(jobManager.getJobTemplate(metalake,
shellJobTemplate.name())).thenReturn(shellJobTemplate);
+
+ JobEntity job1 = newJobEntity("shell_job");
+ JobEntity job2 = newJobEntity("spark_job");
+
+ SupportsRelationOperations supportsRelationOperations =
+ Mockito.mock(SupportsRelationOperations.class);
+ when(supportsRelationOperations.listEntitiesByRelation(
+ SupportsRelationOperations.Type.JOB_TEMPLATE_JOB_REL,
+ NameIdentifierUtil.ofJobTemplate(metalake,
shellJobTemplate.name()),
+ Entity.EntityType.JOB_TEMPLATE))
+ .thenReturn(Lists.newArrayList(job1));
+
when(entityStore.relationOperations()).thenReturn(supportsRelationOperations);
+
+ // Mock the listJobs method to return a list of jobs associated with the
job template
+ when(entityStore.list(NamespaceUtil.ofJob(metalake), JobEntity.class,
Entity.EntityType.JOB))
+ .thenReturn(Lists.newArrayList(job1, job2));
+
+ List<JobEntity> jobs = jobManager.listJobs(metalake,
Optional.of(shellJobTemplate.name()));
+ Assertions.assertEquals(1, jobs.size());
+ Assertions.assertTrue(jobs.contains(job1));
+ Assertions.assertFalse(jobs.contains(job2));
+
+ // List all jobs without filtering by job template
+ jobs = jobManager.listJobs(metalake, Optional.empty());
+ Assertions.assertEquals(2, jobs.size());
+ Assertions.assertTrue(jobs.contains(job1));
+ Assertions.assertTrue(jobs.contains(job2));
+
+ // Throw exception if job template does not exist
+ when(jobManager.getJobTemplate(metalake, "non_existent"))
+ .thenThrow(new NoSuchJobTemplateException("Job template does not
exist"));
+
+ Exception e =
+ Assertions.assertThrows(
+ NoSuchJobTemplateException.class,
+ () -> jobManager.listJobs(metalake, Optional.of("non_existent")));
+ Assertions.assertEquals("Job template does not exist", e.getMessage());
+
+ // Throw exception if entity store fails
+ doThrow(new IOException("Entity store error"))
+ .when(entityStore)
+ .list(NamespaceUtil.ofJob(metalake), JobEntity.class,
Entity.EntityType.JOB);
+ Assertions.assertThrows(
+ RuntimeException.class, () -> jobManager.listJobs(metalake,
Optional.empty()));
+ }
+
+ @Test
+ public void testGetJob() throws IOException {
+ mockedMetalake
+ .when(() -> MetalakeManager.checkMetalake(metalakeIdent, entityStore))
+ .thenAnswer(a -> null);
+
+ JobEntity job = newJobEntity("shell_job");
+ when(entityStore.get(
+ NameIdentifierUtil.ofJob(metalake, job.name()),
Entity.EntityType.JOB, JobEntity.class))
+ .thenReturn(job);
+
+ // Get an existing job
+ JobEntity retrievedJob = jobManager.getJob(metalake, job.name());
+ Assertions.assertEquals(job, retrievedJob);
+
+ // Throw exception if job does not exist
+ when(entityStore.get(
+ NameIdentifierUtil.ofJob(metalake, "non_existent"),
+ Entity.EntityType.JOB,
+ JobEntity.class))
+ .thenThrow(new NoSuchEntityException("Job does not exist"));
+
+ Exception e =
+ Assertions.assertThrows(
+ NoSuchJobException.class, () -> jobManager.getJob(metalake,
"non_existent"));
+ Assertions.assertEquals(
+ "Job with ID non_existent under metalake test_metalake does not
exist", e.getMessage());
+
+ // Throw exception if entity store fails
+ doThrow(new IOException("Entity store error"))
+ .when(entityStore)
+ .get(NameIdentifierUtil.ofJob(metalake, "job"), Entity.EntityType.JOB,
JobEntity.class);
+ Assertions.assertThrows(RuntimeException.class, () ->
jobManager.getJob(metalake, "job"));
+ }
+
+ private static JobTemplateEntity newShellJobTemplateEntity(String name,
String comment) {
+ ShellJobTemplate shellJobTemplate =
+ new ShellJobTemplate.Builder()
+ .withName(name)
+ .withComment(comment)
+ .withExecutable("/bin/echo")
+ .build();
+
+ Random rand = new Random();
+ return JobTemplateEntity.builder()
+ .withId(rand.nextLong())
+ .withName(name)
+ .withNamespace(NamespaceUtil.ofJobTemplate(metalake))
+
.withTemplateContent(JobTemplateEntity.TemplateContent.fromJobTemplate(shellJobTemplate))
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+ }
+
+ private static JobTemplateEntity newSparkJobTemplateEntity(String name,
String comment) {
+ SparkJobTemplate sparkJobTemplate =
+ new SparkJobTemplate.Builder()
+ .withName(name)
+ .withComment(comment)
+ .withClassName("org.apache.spark.examples.SparkPi")
+ .withExecutable("local:///path/to/spark-examples.jar")
+ .build();
+
+ Random rand = new Random();
+ return JobTemplateEntity.builder()
+ .withId(rand.nextLong())
+ .withName(name)
+ .withNamespace(NamespaceUtil.ofJobTemplate(metalake))
+
.withTemplateContent(JobTemplateEntity.TemplateContent.fromJobTemplate(sparkJobTemplate))
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+ }
+
+ private static JobEntity newJobEntity(String templateName) {
+ Random rand = new Random();
+ return JobEntity.builder()
+ .withId(rand.nextLong())
+ .withNamespace(NamespaceUtil.ofJob(metalake))
+ .withJobTemplateName(templateName)
+ .withStatus(JobHandle.Status.QUEUED)
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+ }
+}