This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new cda8017dd4 [#7507] feat(core): Add storage layout for job system
(#7874)
cda8017dd4 is described below
commit cda8017dd4cb09908b801886d764061275e047f5
Author: Jerry Shao <[email protected]>
AuthorDate: Mon Aug 4 19:57:28 2025 +0800
[#7507] feat(core): Add storage layout for job system (#7874)
### What changes were proposed in this pull request?
This PR adds the storage layout for job system to store the job template
and job to the Gravitino's store.
### Why are the changes needed?
Fix: #7507
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UTs to cover the code.
---
.../java/org/apache/gravitino/job/JobManager.java | 31 ++-
.../gravitino/job/local/LocalJobExecutor.java | 5 +
.../gravitino/storage/relational/JDBCBackend.java | 37 ++-
.../storage/relational/mapper/JobMetaMapper.java | 70 ++++++
.../mapper/JobMetaSQLProviderFactory.java | 97 ++++++++
.../relational/mapper/JobTemplateMetaMapper.java | 68 ++++++
.../mapper/JobTemplateMetaSQLProviderFactory.java | 90 ++++++++
.../provider/base/JobMetaBaseSQLProvider.java | 176 ++++++++++++++
.../base/JobTemplateMetaBaseSQLProvider.java | 123 ++++++++++
.../postgresql/JobMetaPostgreSQLProvider.java | 107 +++++++++
.../JobTemplateMetaPostgreSQLProvider.java | 84 +++++++
.../gravitino/storage/relational/po/JobPO.java | 143 ++++++++++++
.../storage/relational/po/JobTemplatePO.java | 133 +++++++++++
.../storage/relational/service/JobMetaService.java | 124 ++++++++++
.../relational/service/JobTemplateMetaService.java | 133 +++++++++++
.../relational/service/MetalakeMetaService.java | 22 +-
.../storage/relational/utils/POConverters.java | 4 +-
.../gravitino/storage/relational/po/TestJobPO.java | 144 ++++++++++++
.../relational/service/TestJobMetaService.java | 183 +++++++++++++++
.../service/TestJobTemplateMetaService.java | 257 +++++++++++++++++++++
scripts/h2/schema-1.0.0-h2.sql | 31 +++
scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql | 33 ++-
scripts/mysql/schema-1.0.0-mysql.sql | 33 ++-
scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql | 31 +++
scripts/postgresql/schema-1.0.0-postgresql.sql | 56 +++++
.../upgrade-0.9.0-to-1.0.0-postgresql.sql | 56 +++++
26 files changed, 2250 insertions(+), 21 deletions(-)
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index 2b4120ec64..8bb193a7a3 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -335,10 +335,12 @@ public class JobManager implements
JobOperationDispatcher, Closeable {
// Retrieve the job entity, will throw NoSuchJobException if the job does
not exist.
JobEntity jobEntity = getJob(metalake, jobId);
- if (jobEntity.status() == JobHandle.Status.CANCELLED
+ if (jobEntity.status() == JobHandle.Status.CANCELLING
+ || jobEntity.status() == JobHandle.Status.CANCELLED
|| jobEntity.status() == JobHandle.Status.SUCCEEDED
|| jobEntity.status() == JobHandle.Status.FAILED) {
- // If the job is already cancelled, succeeded, or failed, we do not need
to cancel it again.
+ // If the job is already cancelling, cancelled, succeeded, or failed, we
do not need to cancel
+ // it again.
return jobEntity;
}
@@ -350,6 +352,8 @@ public class JobManager implements JobOperationDispatcher,
Closeable {
String.format("Failed to cancel job with ID %s under metalake %s",
jobId, metalake), e);
}
+ // TODO(jerry). Implement a background thread to monitor the job status
and update it. Also,
+ // we should delete the finished job entities after a certain period of
time.
// Update the job status to CANCELING
JobEntity newJobEntity =
JobEntity.builder()
@@ -366,15 +370,20 @@ public class JobManager implements
JobOperationDispatcher, Closeable {
.withLastModifiedTime(Instant.now())
.build())
.build();
-
- try {
- // Update the job entity in the entity store
- entityStore.put(newJobEntity, true /* overwrite */);
- return newJobEntity;
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Failed to update job entity %s to CANCELING status",
newJobEntity), e);
- }
+ return TreeLockUtils.doWithTreeLock(
+ NameIdentifierUtil.ofJob(metalake, jobId),
+ LockType.WRITE,
+ () -> {
+ try {
+ // Update the job entity in the entity store
+ entityStore.put(newJobEntity, true /* overwrite */);
+ return newJobEntity;
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to update job entity %s to CANCELING
status", newJobEntity),
+ e);
+ }
+ });
}
@Override
diff --git
a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
index b8e41b851e..5d114a4605 100644
--- a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
+++ b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
@@ -198,6 +198,11 @@ public class LocalJobExecutor implements JobExecutor {
return;
}
+ if (statusPair.getLeft() == JobHandle.Status.CANCELLING) {
+ LOG.warn("Job {} is already being cancelled, no action taken", jobId);
+ return;
+ }
+
// If the job is queued.
if (statusPair.getLeft() == JobHandle.Status.QUEUED) {
waitingQueue.removeIf(p -> p.getLeft().equals(jobId));
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 966b4e3250..b9440c9ae9 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -42,6 +42,8 @@ import org.apache.gravitino.meta.BaseMetalake;
import org.apache.gravitino.meta.CatalogEntity;
import org.apache.gravitino.meta.FilesetEntity;
import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
import org.apache.gravitino.meta.ModelEntity;
import org.apache.gravitino.meta.ModelVersionEntity;
import org.apache.gravitino.meta.PolicyEntity;
@@ -56,6 +58,8 @@ import
org.apache.gravitino.storage.relational.database.H2Database;
import org.apache.gravitino.storage.relational.service.CatalogMetaService;
import org.apache.gravitino.storage.relational.service.FilesetMetaService;
import org.apache.gravitino.storage.relational.service.GroupMetaService;
+import org.apache.gravitino.storage.relational.service.JobMetaService;
+import org.apache.gravitino.storage.relational.service.JobTemplateMetaService;
import org.apache.gravitino.storage.relational.service.MetalakeMetaService;
import org.apache.gravitino.storage.relational.service.ModelMetaService;
import org.apache.gravitino.storage.relational.service.ModelVersionMetaService;
@@ -128,6 +132,11 @@ public class JDBCBackend implements RelationalBackend {
ModelVersionMetaService.getInstance().listModelVersionsByNamespace(namespace);
case POLICY:
return (List<E>)
PolicyMetaService.getInstance().listPoliciesByNamespace(namespace);
+ case JOB_TEMPLATE:
+ return (List<E>)
+
JobTemplateMetaService.getInstance().listJobTemplatesByNamespace(namespace);
+ case JOB:
+ return (List<E>)
JobMetaService.getInstance().listJobsByNamespace(namespace);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for list operation", entityType);
@@ -178,6 +187,10 @@ public class JDBCBackend implements RelationalBackend {
ModelVersionMetaService.getInstance().insertModelVersion((ModelVersionEntity)
e);
} else if (e instanceof PolicyEntity) {
PolicyMetaService.getInstance().insertPolicy((PolicyEntity) e,
overwritten);
+ } else if (e instanceof JobTemplateEntity) {
+
JobTemplateMetaService.getInstance().insertJobTemplate((JobTemplateEntity) e,
overwritten);
+ } else if (e instanceof JobEntity) {
+ JobMetaService.getInstance().insertJob((JobEntity) e, overwritten);
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for insert operation", e.getClass());
@@ -252,6 +265,10 @@ public class JDBCBackend implements RelationalBackend {
return (E)
ModelVersionMetaService.getInstance().getModelVersionByIdentifier(ident);
case POLICY:
return (E)
PolicyMetaService.getInstance().getPolicyByIdentifier(ident);
+ case JOB_TEMPLATE:
+ return (E)
JobTemplateMetaService.getInstance().getJobTemplateByIdentifier(ident);
+ case JOB:
+ return (E) JobMetaService.getInstance().getJobByIdentifier(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for get operation", entityType);
@@ -288,6 +305,8 @@ public class JDBCBackend implements RelationalBackend {
return ModelVersionMetaService.getInstance().deleteModelVersion(ident);
case POLICY:
return PolicyMetaService.getInstance().deletePolicy(ident);
+ case JOB_TEMPLATE:
+ return JobTemplateMetaService.getInstance().deleteJobTemplate(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for delete operation", entityType);
@@ -354,11 +373,12 @@ public class JDBCBackend implements RelationalBackend {
.deleteModelVersionMetasByLegacyTimeline(
legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case JOB_TEMPLATE:
- // TODO: Implement hard delete logic for job templates.
- return 0;
+ return JobTemplateMetaService.getInstance()
+ .deleteJobTemplatesByLegacyTimeline(
+ legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case JOB:
- // TODO: Implement hard delete logic for jobs.
- return 0;
+ return JobMetaService.getInstance()
+ .deleteJobsByLegacyTimeline(legacyTimeline,
GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case AUDIT:
return 0;
// TODO: Implement hard delete logic for these entity types.
@@ -478,6 +498,15 @@ public class JDBCBackend implements RelationalBackend {
throw new IllegalArgumentException(
String.format("ROLE_USER_REL doesn't support type %s",
identType.name()));
}
+
+ case JOB_TEMPLATE_JOB_REL:
+ if (identType == Entity.EntityType.JOB_TEMPLATE) {
+ return (List<E>)
JobMetaService.getInstance().listJobsByTemplateIdent(nameIdentifier);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("JOB_TEMPLATE_JOB_REL doesn't support type %s",
identType.name()));
+ }
+
default:
throw new IllegalArgumentException(
String.format("Doesn't support the relation type %s", relType));
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
new file mode 100644
index 0000000000..a8a45e3888
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.JobPO;
+import org.apache.ibatis.annotations.DeleteProvider;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
+
+public interface JobMetaMapper {
+ String TABLE_NAME = "job_run_meta";
+
+ @InsertProvider(type = JobMetaSQLProviderFactory.class, method =
"insertJobMeta")
+ void insertJobMeta(@Param("jobMeta") JobPO jobPO);
+
+ @InsertProvider(
+ type = JobMetaSQLProviderFactory.class,
+ method = "insertJobMetaOnDuplicateKeyUpdate")
+ void insertJobMetaOnDuplicateKeyUpdate(@Param("jobMeta") JobPO jobPO);
+
+ @SelectProvider(type = JobMetaSQLProviderFactory.class, method =
"listJobPOsByMetalake")
+ List<JobPO> listJobPOsByMetalake(@Param("metalakeName") String metalakeName);
+
+ @SelectProvider(
+ type = JobMetaSQLProviderFactory.class,
+ method = "listJobPOsByMetalakeAndTemplate")
+ List<JobPO> listJobPOsByMetalakeAndTemplate(
+ @Param("metalakeName") String metalakeName, @Param("jobTemplateName")
String jobTemplateName);
+
+ @SelectProvider(type = JobMetaSQLProviderFactory.class, method =
"selectJobPOByMetalakeAndRunId")
+ JobPO selectJobPOByMetalakeAndRunId(
+ @Param("metalakeName") String metalakeName, @Param("jobRunId") Long
jobRunId);
+
+ @UpdateProvider(
+ type = JobMetaSQLProviderFactory.class,
+ method = "softDeleteJobMetaByMetalakeAndTemplate")
+ Integer softDeleteJobMetaByMetalakeAndTemplate(
+ @Param("metalakeName") String metalakeName, @Param("jobTemplateName")
String jobTemplateName);
+
+ @UpdateProvider(type = JobMetaSQLProviderFactory.class, method =
"softDeleteJobMetasByMetalakeId")
+ void softDeleteJobMetasByMetalakeId(@Param("metalakeId") Long metalakeId);
+
+ @UpdateProvider(
+ type = JobMetaSQLProviderFactory.class,
+ method = "softDeleteJobMetasByLegacyTimeline")
+ void softDeleteJobMetasByLegacyTimeline(@Param("legacyTimeline") Long
legacyTimeline);
+
+ @DeleteProvider(type = JobMetaSQLProviderFactory.class, method =
"deleteJobMetasByLegacyTimeline")
+ Integer deleteJobMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
new file mode 100644
index 0000000000..25a4c924f6
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.JobMetaBaseSQLProvider;
+import
org.apache.gravitino.storage.relational.mapper.provider.postgresql.JobMetaPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.JobPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class JobMetaSQLProviderFactory {
+
+ private static final Map<JDBCBackend.JDBCBackendType, JobMetaBaseSQLProvider>
+ JOB_META_SQL_PROVIDER_MAP =
+ ImmutableMap.of(
+ JDBCBackend.JDBCBackendType.MYSQL, new JobMetaMySQLProvider(),
+ JDBCBackend.JDBCBackendType.H2, new JobMetaH2Provider(),
+ JDBCBackend.JDBCBackendType.POSTGRESQL, new
JobMetaPostgreSQLProvider());
+
+ public static JobMetaBaseSQLProvider getProvider() {
+ String databaseId =
+ SqlSessionFactoryHelper.getInstance()
+ .getSqlSessionFactory()
+ .getConfiguration()
+ .getDatabaseId();
+
+ JDBCBackend.JDBCBackendType jdbcBackendType =
+ JDBCBackend.JDBCBackendType.fromString(databaseId);
+ return JOB_META_SQL_PROVIDER_MAP.get(jdbcBackendType);
+ }
+
+ static class JobMetaMySQLProvider extends JobMetaBaseSQLProvider {}
+
+ static class JobMetaH2Provider extends JobMetaBaseSQLProvider {}
+
+ public static String insertJobMeta(@Param("jobMeta") JobPO jobPO) {
+ return getProvider().insertJobMeta(jobPO);
+ }
+
+ public static String insertJobMetaOnDuplicateKeyUpdate(@Param("jobMeta")
JobPO jobPO) {
+ return getProvider().insertJobMetaOnDuplicateKeyUpdate(jobPO);
+ }
+
+ public static String listJobPOsByMetalake(@Param("metalakeName") String
metalakeName) {
+ return getProvider().listJobPOsByMetalake(metalakeName);
+ }
+
+ public static String listJobPOsByMetalakeAndTemplate(
+ @Param("metalakeName") String metalakeName,
+ @Param("jobTemplateName") String jobTemplateName) {
+ return getProvider().listJobPOsByMetalakeAndTemplate(metalakeName,
jobTemplateName);
+ }
+
+ public static String selectJobPOByMetalakeAndRunId(
+ @Param("metalakeName") String metalakeName, @Param("jobRunId") Long
jobRunId) {
+ return getProvider().selectJobPOByMetalakeAndRunId(metalakeName, jobRunId);
+ }
+
+ public static String softDeleteJobMetaByMetalakeAndTemplate(
+ @Param("metalakeName") String metalakeName,
+ @Param("jobTemplateName") String jobTemplateName) {
+ return getProvider().softDeleteJobMetaByMetalakeAndTemplate(metalakeName,
jobTemplateName);
+ }
+
+ public static String softDeleteJobMetasByMetalakeId(@Param("metalakeId")
Long metalakeId) {
+ return getProvider().softDeleteJobMetasByMetalakeId(metalakeId);
+ }
+
+ public static String deleteJobMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return getProvider().deleteJobMetasByLegacyTimeline(legacyTimeline, limit);
+ }
+
+ public static String softDeleteJobMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline) {
+ return getProvider().softDeleteJobMetasByLegacyTimeline(legacyTimeline);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaMapper.java
new file mode 100644
index 0000000000..c902880908
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaMapper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.JobTemplatePO;
+import org.apache.ibatis.annotations.DeleteProvider;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
+
+public interface JobTemplateMetaMapper {
+ String TABLE_NAME = "job_template_meta";
+
+ @InsertProvider(type = JobTemplateMetaSQLProviderFactory.class, method =
"insertJobTemplateMeta")
+ void insertJobTemplateMeta(@Param("jobTemplateMeta") JobTemplatePO
jobTemplatePO);
+
+ @InsertProvider(
+ type = JobTemplateMetaSQLProviderFactory.class,
+ method = "insertJobTemplateMetaOnDuplicateKeyUpdate")
+ void insertJobTemplateMetaOnDuplicateKeyUpdate(
+ @Param("jobTemplateMeta") JobTemplatePO jobTemplatePO);
+
+ @SelectProvider(
+ type = JobTemplateMetaSQLProviderFactory.class,
+ method = "listJobTemplatePOsByMetalake")
+ List<JobTemplatePO> listJobTemplatePOsByMetalake(@Param("metalakeName")
String metalakeName);
+
+ @SelectProvider(
+ type = JobTemplateMetaSQLProviderFactory.class,
+ method = "selectJobTemplatePOByMetalakeAndName")
+ JobTemplatePO selectJobTemplatePOByMetalakeAndName(
+ @Param("metalakeName") String metalakeName, @Param("jobTemplateName")
String jobTemplateName);
+
+ @UpdateProvider(
+ type = JobTemplateMetaSQLProviderFactory.class,
+ method = "softDeleteJobTemplateMetaByMetalakeAndName")
+ Integer softDeleteJobTemplateMetaByMetalakeAndName(
+ @Param("metalakeName") String metalakeName, @Param("jobTemplateName")
String jobTemplateName);
+
+ @UpdateProvider(
+ type = JobTemplateMetaSQLProviderFactory.class,
+ method = "softDeleteJobTemplateMetasByMetalakeId")
+ void softDeleteJobTemplateMetasByMetalakeId(@Param("metalakeId") Long
metalakeId);
+
+ @DeleteProvider(
+ type = JobTemplateMetaSQLProviderFactory.class,
+ method = "deleteJobTemplateMetasByLegacyTimeline")
+ Integer deleteJobTemplateMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaSQLProviderFactory.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaSQLProviderFactory.java
new file mode 100644
index 0000000000..64de88dc88
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaSQLProviderFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.JobTemplateMetaBaseSQLProvider;
+import
org.apache.gravitino.storage.relational.mapper.provider.postgresql.JobTemplateMetaPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.JobTemplatePO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class JobTemplateMetaSQLProviderFactory {
+
+ private static final Map<JDBCBackend.JDBCBackendType,
JobTemplateMetaBaseSQLProvider>
+ JOB_TEMPLATE_META_SQL_PROVIDER_MAP =
+ ImmutableMap.of(
+ JDBCBackend.JDBCBackendType.MYSQL, new
JobTemplateMetaMySQLProvider(),
+ JDBCBackend.JDBCBackendType.H2, new JobTemplateMetaH2Provider(),
+ JDBCBackend.JDBCBackendType.POSTGRESQL, new
JobTemplateMetaPostgreSQLProvider());
+
+ public static JobTemplateMetaBaseSQLProvider getProvider() {
+ String databaseId =
+ SqlSessionFactoryHelper.getInstance()
+ .getSqlSessionFactory()
+ .getConfiguration()
+ .getDatabaseId();
+
+ JDBCBackend.JDBCBackendType jdbcBackendType =
+ JDBCBackend.JDBCBackendType.fromString(databaseId);
+ return JOB_TEMPLATE_META_SQL_PROVIDER_MAP.get(jdbcBackendType);
+ }
+
+ static class JobTemplateMetaMySQLProvider extends
JobTemplateMetaBaseSQLProvider {}
+
+ static class JobTemplateMetaH2Provider extends
JobTemplateMetaBaseSQLProvider {}
+
+ public static String insertJobTemplateMeta(
+ @Param("jobTemplateMeta") JobTemplatePO jobTemplatePO) {
+ return getProvider().insertJobTemplateMeta(jobTemplatePO);
+ }
+
+ public static String insertJobTemplateMetaOnDuplicateKeyUpdate(
+ @Param("jobTemplateMeta") JobTemplatePO jobTemplatePO) {
+ return
getProvider().insertJobTemplateMetaOnDuplicateKeyUpdate(jobTemplatePO);
+ }
+
+ public static String listJobTemplatePOsByMetalake(@Param("metalakeName")
String metalakeName) {
+ return getProvider().listJobTemplatePOsByMetalake(metalakeName);
+ }
+
+ public static String selectJobTemplatePOByMetalakeAndName(
+ @Param("metalakeName") String metalakeName,
+ @Param("jobTemplateName") String jobTemplateName) {
+ return getProvider().selectJobTemplatePOByMetalakeAndName(metalakeName,
jobTemplateName);
+ }
+
+ public static String softDeleteJobTemplateMetaByMetalakeAndName(
+ @Param("metalakeName") String metalakeName,
+ @Param("jobTemplateName") String jobTemplateName) {
+ return
getProvider().softDeleteJobTemplateMetaByMetalakeAndName(metalakeName,
jobTemplateName);
+ }
+
+ public static String softDeleteJobTemplateMetasByMetalakeId(
+ @Param("metalakeId") Long metalakeId) {
+ return getProvider().softDeleteJobTemplateMetasByMetalakeId(metalakeId);
+ }
+
+ public static String deleteJobTemplateMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return
getProvider().deleteJobTemplateMetasByLegacyTimeline(legacyTimeline, limit);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
new file mode 100644
index 0000000000..03abcfcdc5
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import org.apache.gravitino.storage.relational.mapper.JobMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.po.JobPO;
+import org.apache.ibatis.annotations.Param;
+
+public class JobMetaBaseSQLProvider {
+
+ public String insertJobMeta(@Param("jobMeta") JobPO jobPO) {
+ return "INSERT INTO "
+ + JobMetaMapper.TABLE_NAME
+ + " (job_run_id, job_template_id, metalake_id,"
+ + " job_execution_id, job_run_status, job_finished_at, audit_info,
current_version,"
+ + " last_version, deleted_at)"
+ + " VALUES (#{jobMeta.jobRunId},"
+ + " (SELECT job_template_id FROM "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " WHERE job_template_name = #{jobMeta.jobTemplateName}"
+ + " AND metalake_id = #{jobMeta.metalakeId} AND deleted_at = 0),"
+ + " #{jobMeta.metalakeId}, #{jobMeta.jobExecutionId},"
+ + " #{jobMeta.jobRunStatus}, #{jobMeta.jobFinishedAt},
#{jobMeta.auditInfo},"
+ + " #{jobMeta.currentVersion}, #{jobMeta.lastVersion},"
+ + " #{jobMeta.deletedAt})";
+ }
+
+ public String insertJobMetaOnDuplicateKeyUpdate(@Param("jobMeta") JobPO
jobPO) {
+ return "INSERT INTO "
+ + JobMetaMapper.TABLE_NAME
+ + " (job_run_id, job_template_id, metalake_id,"
+ + " job_execution_id, job_run_status, job_finished_at, audit_info,
current_version,"
+ + " last_version, deleted_at)"
+ + " VALUES (#{jobMeta.jobRunId},"
+ + " (SELECT job_template_id FROM "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " WHERE job_template_name = #{jobMeta.jobTemplateName}"
+ + " AND metalake_id = #{jobMeta.metalakeId} AND deleted_at = 0),"
+ + " #{jobMeta.metalakeId}, #{jobMeta.jobExecutionId},"
+ + " #{jobMeta.jobRunStatus}, #{jobMeta.jobFinishedAt},
#{jobMeta.auditInfo},"
+ + " #{jobMeta.currentVersion}, #{jobMeta.lastVersion},"
+ + " #{jobMeta.deletedAt})"
+ + " ON DUPLICATE KEY UPDATE"
+ + " job_template_id = (SELECT job_template_id FROM "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " WHERE job_template_name = #{jobMeta.jobTemplateName}"
+ + " AND metalake_id = #{jobMeta.metalakeId} AND deleted_at = 0),"
+ + " metalake_id = #{jobMeta.metalakeId},"
+ + " job_execution_id = #{jobMeta.jobExecutionId},"
+ + " job_run_status = #{jobMeta.jobRunStatus},"
+ + " job_finished_at = #{jobMeta.jobFinishedAt},"
+ + " audit_info = #{jobMeta.auditInfo},"
+ + " current_version = #{jobMeta.currentVersion},"
+ + " last_version = #{jobMeta.lastVersion},"
+ + " deleted_at = #{jobMeta.deletedAt}";
+ }
+
+ public String listJobPOsByMetalake(@Param("metalakeName") String
metalakeName) {
+ return "SELECT jrm.job_run_id AS jobRunId, jtm.job_template_name AS
jobTemplateName,"
+ + " jrm.metalake_id AS metalakeId, jrm.job_execution_id AS
jobExecutionId,"
+ + " jrm.job_run_status AS jobRunStatus, jrm.job_finished_at AS
jobFinishedAt,"
+ + " jrm.audit_info AS auditInfo,"
+ + " jrm.current_version AS currentVersion, jrm.last_version AS
lastVersion,"
+ + " jrm.deleted_at AS deletedAt"
+ + " FROM "
+ + JobMetaMapper.TABLE_NAME
+ + " jrm JOIN "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " jtm ON jrm.job_template_id = jtm.job_template_id"
+ + " JOIN "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " mm ON jrm.metalake_id = mm.metalake_id"
+ + " WHERE mm.metalake_name = #{metalakeName} AND jrm.deleted_at = 0
AND mm.deleted_at = 0"
+ + " AND jtm.deleted_at = 0";
+ }
+
+ public String listJobPOsByMetalakeAndTemplate(
+ @Param("metalakeName") String metalakeName,
+ @Param("jobTemplateName") String jobTemplateName) {
+ return "SELECT jrm.job_run_id AS jobRunId, jtm.job_template_name AS
jobTemplateName,"
+ + " jrm.metalake_id AS metalakeId, jrm.job_execution_id AS
jobExecutionId,"
+ + " jrm.job_run_status AS jobRunStatus, jrm.job_finished_at AS
jobFinishedAt, "
+ + " jrm.audit_info AS auditInfo,"
+ + " jrm.current_version AS currentVersion, jrm.last_version AS
lastVersion,"
+ + " jrm.deleted_at AS deletedAt"
+ + " FROM "
+ + JobMetaMapper.TABLE_NAME
+ + " jrm JOIN "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " jtm ON jrm.job_template_id = jtm.job_template_id"
+ + " JOIN "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " mm ON jrm.metalake_id = mm.metalake_id"
+ + " WHERE mm.metalake_name = #{metalakeName} AND jtm.job_template_name
= #{jobTemplateName}"
+ + " AND jrm.deleted_at = 0 AND mm.deleted_at = 0 AND jtm.deleted_at =
0";
+ }
+
+ public String selectJobPOByMetalakeAndRunId(
+ @Param("metalakeName") String metalakeName, @Param("jobRunId") Long
jobRunId) {
+ return "SELECT jrm.job_run_id AS jobRunId, jtm.job_template_name AS
jobTemplateName,"
+ + " jrm.metalake_id AS metalakeId, jrm.job_execution_id AS
jobExecutionId,"
+ + " jrm.job_run_status AS jobRunStatus, jrm.job_finished_at AS
jobFinishedAt,"
+ + " jrm.audit_info AS auditInfo,"
+ + " jrm.current_version AS currentVersion, jrm.last_version AS
lastVersion,"
+ + " jrm.deleted_at AS deletedAt"
+ + " FROM "
+ + JobMetaMapper.TABLE_NAME
+ + " jrm JOIN "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " jtm ON jrm.job_template_id = jtm.job_template_id"
+ + " JOIN "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " mm ON jrm.metalake_id = mm.metalake_id"
+ + " WHERE mm.metalake_name = #{metalakeName} AND jrm.job_run_id =
#{jobRunId}"
+ + " AND jrm.deleted_at = 0 AND mm.deleted_at = 0 AND jtm.deleted_at =
0";
+ }
+
+ public String softDeleteJobMetaByMetalakeAndTemplate(
+ @Param("metalakeName") String metalakeName,
+ @Param("jobTemplateName") String jobTemplateName) {
+ return "UPDATE "
+ + JobMetaMapper.TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000.0"
+ + " WHERE metalake_id = ("
+ + " SELECT metalake_id FROM "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " WHERE metalake_name = #{metalakeName} AND deleted_at = 0)"
+ + " AND job_template_id IN ("
+ + " SELECT job_template_id FROM "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " WHERE job_template_name = #{jobTemplateName} AND deleted_at = 0)"
+ + " AND deleted_at = 0";
+ }
+
+ public String softDeleteJobMetasByMetalakeId(@Param("metalakeId") Long
metalakeId) {
+ return "UPDATE "
+ + JobMetaMapper.TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000.0"
+ + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+ }
+
+ public String softDeleteJobMetasByLegacyTimeline(@Param("legacyTimeline")
Long legacyTimeline) {
+ return "UPDATE "
+ + JobMetaMapper.TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000.0"
+ + " WHERE job_finished_at < #{legacyTimeline} AND job_finished_at > 0";
+ }
+
+ public String deleteJobMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return "DELETE FROM "
+ + JobMetaMapper.TABLE_NAME
+ + " WHERE deleted_at < #{legacyTimeline} AND deleted_at > 0 LIMIT
#{limit}";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobTemplateMetaBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobTemplateMetaBaseSQLProvider.java
new file mode 100644
index 0000000000..43111d3efd
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobTemplateMetaBaseSQLProvider.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.po.JobTemplatePO;
+import org.apache.ibatis.annotations.Param;
+
+public class JobTemplateMetaBaseSQLProvider {
+
+ public String insertJobTemplateMeta(@Param("jobTemplateMeta") JobTemplatePO
jobTemplatePO) {
+ return "INSERT INTO "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " (job_template_id, job_template_name, metalake_id,"
+ + " job_template_comment, job_template_content, audit_info,"
+ + " current_version, last_version, deleted_at)"
+ + " VALUES (#{jobTemplateMeta.jobTemplateId},
#{jobTemplateMeta.jobTemplateName},"
+ + " #{jobTemplateMeta.metalakeId},
#{jobTemplateMeta.jobTemplateComment},"
+ + " #{jobTemplateMeta.jobTemplateContent},
#{jobTemplateMeta.auditInfo},"
+ + " #{jobTemplateMeta.currentVersion}, #{jobTemplateMeta.lastVersion},"
+ + " #{jobTemplateMeta.deletedAt})";
+ }
+
+ public String insertJobTemplateMetaOnDuplicateKeyUpdate(
+ @Param("jobTemplateMeta") JobTemplatePO jobTemplatePO) {
+ return "INSERT INTO "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " (job_template_id, job_template_name, metalake_id,"
+ + " job_template_comment, job_template_content, audit_info,
current_version,"
+ + " last_version, deleted_at)"
+ + " VALUES (#{jobTemplateMeta.jobTemplateId},
#{jobTemplateMeta.jobTemplateName},"
+ + " #{jobTemplateMeta.metalakeId},
#{jobTemplateMeta.jobTemplateComment},"
+ + " #{jobTemplateMeta.jobTemplateContent},
#{jobTemplateMeta.auditInfo},"
+ + " #{jobTemplateMeta.currentVersion}, #{jobTemplateMeta.lastVersion},"
+ + " #{jobTemplateMeta.deletedAt})"
+ + " ON DUPLICATE KEY UPDATE"
+ + " job_template_name = #{jobTemplateMeta.jobTemplateName},"
+ + " metalake_id = #{jobTemplateMeta.metalakeId},"
+ + " job_template_comment = #{jobTemplateMeta.jobTemplateComment},"
+ + " job_template_content = #{jobTemplateMeta.jobTemplateContent},"
+ + " audit_info = #{jobTemplateMeta.auditInfo},"
+ + " current_version = #{jobTemplateMeta.currentVersion},"
+ + " last_version = #{jobTemplateMeta.lastVersion},"
+ + " deleted_at = #{jobTemplateMeta.deletedAt}";
+ }
+
+ public String listJobTemplatePOsByMetalake(@Param("metalakeName") String
metalakeName) {
+ return "SELECT jtm.job_template_id AS jobTemplateId, jtm.job_template_name
AS jobTemplateName,"
+ + " jtm.metalake_id AS metalakeId, jtm.job_template_comment AS
jobTemplateComment,"
+ + " jtm.job_template_content AS jobTemplateContent, jtm.audit_info AS
auditInfo,"
+ + " jtm.current_version AS currentVersion, jtm.last_version AS
lastVersion,"
+ + " jtm.deleted_at AS deletedAt"
+ + " FROM "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " jtm JOIN "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " mm ON jtm.metalake_id = mm.metalake_id"
+ + " WHERE mm.metalake_name = #{metalakeName} AND jtm.deleted_at = 0
AND mm.deleted_at = 0";
+ }
+
+ public String selectJobTemplatePOByMetalakeAndName(
+ @Param("metalakeName") String metalakeName,
+ @Param("jobTemplateName") String jobTemplateName) {
+ return "SELECT jtm.job_template_id AS jobTemplateId, jtm.job_template_name
AS jobTemplateName,"
+ + " jtm.metalake_id AS metalakeId, jtm.job_template_comment AS
jobTemplateComment,"
+ + " jtm.job_template_content AS jobTemplateContent, jtm.audit_info AS
auditInfo,"
+ + " jtm.current_version AS currentVersion, jtm.last_version AS
lastVersion,"
+ + " jtm.deleted_at AS deletedAt"
+ + " FROM "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " jtm JOIN "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " mm ON jtm.metalake_id = mm.metalake_id"
+ + " WHERE mm.metalake_name = #{metalakeName} AND jtm.job_template_name
= #{jobTemplateName}"
+ + " AND jtm.deleted_at = 0 AND mm.deleted_at = 0";
+ }
+
+ public String softDeleteJobTemplateMetaByMetalakeAndName(
+ @Param("metalakeName") String metalakeName,
+ @Param("jobTemplateName") String jobTemplateName) {
+ return "UPDATE "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000.0"
+ + " WHERE job_template_name = #{jobTemplateName} AND metalake_id ="
+ + " (SELECT metalake_id FROM "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " WHERE metalake_name = #{metalakeName} AND deleted_at = 0)"
+ + " AND deleted_at = 0";
+ }
+
+ public String softDeleteJobTemplateMetasByMetalakeId(@Param("metalakeId")
Long metalakeId) {
+ return "UPDATE "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000.0"
+ + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+ }
+
+ public String deleteJobTemplateMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return "DELETE FROM "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " WHERE deleted_at < #{legacyTimeline} AND deleted_at > 0 LIMIT
#{limit}";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
new file mode 100644
index 0000000000..56e667d274
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import org.apache.gravitino.storage.relational.mapper.JobMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.JobMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.JobPO;
+import org.apache.ibatis.annotations.Param;
+
+public class JobMetaPostgreSQLProvider extends JobMetaBaseSQLProvider {
+
+ @Override
+ public String insertJobMetaOnDuplicateKeyUpdate(@Param("jobMeta") JobPO
jobPO) {
+ return "INSERT INTO "
+ + JobMetaMapper.TABLE_NAME
+ + " (job_run_id, job_template_id, metalake_id,"
+ + " job_execution_id, job_run_status, job_finished_at, audit_info,
current_version,"
+ + " last_version, deleted_at)"
+ + " VALUES (#{jobMeta.jobRunId},"
+ + " (SELECT job_template_id FROM "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " WHERE job_template_name = #{jobMeta.jobTemplateName}"
+ + " AND metalake_id = #{jobMeta.metalakeId} AND deleted_at = 0),"
+ + " #{jobMeta.metalakeId}, #{jobMeta.jobExecutionId},"
+ + " #{jobMeta.jobRunStatus}, #{jobMeta.jobFinished},
#{jobMeta.auditInfo},"
+ + " #{jobMeta.currentVersion}, #{jobMeta.lastVersion},"
+ + " #{jobMeta.deletedAt})"
+ + " ON CONFLICT (job_run_id) DO UPDATE SET"
+ + " job_template_id = (SELECT job_template_id FROM "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " WHERE job_template_name = #{jobMeta.jobTemplateName}"
+ + " AND metalake_id = #{jobMeta.metalakeId} AND deleted_at = 0),"
+ + " metalake_id = #{jobMeta.metalakeId},"
+ + " job_execution_id = #{jobMeta.jobExecutionId},"
+ + " job_run_status = #{jobMeta.jobRunStatus},"
+ + " job_finished_at = #{jobMeta.jobFinishedAt},"
+ + " audit_info = #{jobMeta.auditInfo},"
+ + " current_version = #{jobMeta.currentVersion},"
+ + " last_version = #{jobMeta.lastVersion},"
+ + " deleted_at = #{jobMeta.deletedAt}";
+ }
+
+ @Override
+ public String softDeleteJobMetaByMetalakeAndTemplate(
+ @Param("metalakeName") String metalakeName,
+ @Param("jobTemplateName") String jobTemplateName) {
+ return "UPDATE "
+ + JobMetaMapper.TABLE_NAME
+ + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+ + " timestamp '1970-01-01 00:00:00')*1000))) "
+ + " WHERE metalake_id IN ("
+ + " SELECT metalake_id FROM "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " WHERE metalake_name = #{metalakeName} AND deleted_at = 0)"
+ + " AND job_template_id IN ("
+ + " SELECT job_template_id FROM "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " WHERE job_template_name = #{jobTemplateName} AND deleted_at = 0)"
+ + " AND deleted_at = 0";
+ }
+
+ @Override
+ public String softDeleteJobMetasByMetalakeId(@Param("metalakeId") Long
metalakeId) {
+ return "UPDATE "
+ + JobMetaMapper.TABLE_NAME
+ + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+ + " timestamp '1970-01-01 00:00:00')*1000))) "
+ + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+ }
+
+ @Override
+ public String softDeleteJobMetasByLegacyTimeline(@Param("legacyTimeline")
Long legacyTimeline) {
+ return "UPDATE "
+ + JobMetaMapper.TABLE_NAME
+ + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+ + " timestamp '1970-01-01 00:00:00')*1000))) "
+ + " WHERE job_finished_at < #{legacyTimeline} AND job_finished_at > 0";
+ }
+
+ @Override
+ public String deleteJobMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return "DELETE FROM "
+ + JobMetaMapper.TABLE_NAME
+ + " WHERE job_run_id IN (SELECT job_run_id FROM "
+ + JobMetaMapper.TABLE_NAME
+ + " WHERE deleted_at < #{legacyTimeline} AND deleted_at > 0 LIMIT
#{limit})";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobTemplateMetaPostgreSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobTemplateMetaPostgreSQLProvider.java
new file mode 100644
index 0000000000..e0c9557319
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobTemplateMetaPostgreSQLProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.JobTemplateMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.JobTemplatePO;
+import org.apache.ibatis.annotations.Param;
+
+public class JobTemplateMetaPostgreSQLProvider extends
JobTemplateMetaBaseSQLProvider {
+
+ @Override
+ public String softDeleteJobTemplateMetaByMetalakeAndName(
+ @Param("metalakeName") String metalakeName,
+ @Param("jobTemplateName") String jobTemplateName) {
+ return "UPDATE "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+ + " timestamp '1970-01-01 00:00:00')*1000))) "
+ + " WHERE metalake_id IN ("
+ + " SELECT metalake_id FROM "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " WHERE metalake_name = #{metalakeName} AND deleted_at = 0)"
+ + " AND job_template_name = #{jobTemplateName} AND deleted_at = 0";
+ }
+
+ @Override
+ public String softDeleteJobTemplateMetasByMetalakeId(@Param("metalakeId")
Long metalakeId) {
+ return "UPDATE "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+ + " timestamp '1970-01-01 00:00:00')*1000))) "
+ + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+ }
+
+ @Override
+ public String insertJobTemplateMetaOnDuplicateKeyUpdate(
+ @Param("jobTemplateMeta") JobTemplatePO jobTemplatePO) {
+ return "INSERT INTO "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " (job_template_id, job_template_name, metalake_id,
job_template_comment, "
+ + " job_template_content, audit_info, current_version, last_version,
deleted_at)"
+ + " VALUES (#{jobTemplateMeta.jobTemplateId},
#{jobTemplateMeta.jobTemplateName},"
+ + " #{jobTemplateMeta.metalakeId},
#{jobTemplateMeta.jobTemplateComment},"
+ + " #{jobTemplateMeta.jobTemplateContent},
#{jobTemplateMeta.auditInfo},"
+ + " #{jobTemplateMeta.currentVersion}, #{jobTemplateMeta.lastVersion},"
+ + " #{jobTemplateMeta.deletedAt})"
+ + " ON CONFLICT(job_template_id) DO UPDATE SET"
+ + " job_template_name = #{jobTemplateMeta.jobTemplateName},"
+ + " metalake_id = #{jobTemplateMeta.metalakeId},"
+ + " job_template_comment = #{jobTemplateMeta.jobTemplateComment},"
+ + " job_template_content = #{jobTemplateMeta.jobTemplateContent},"
+ + " audit_info = #{jobTemplateMeta.auditInfo},"
+ + " current_version = #{jobTemplateMeta.currentVersion},"
+ + " last_version = #{jobTemplateMeta.lastVersion},"
+ + " deleted_at = #{jobTemplateMeta.deletedAt}";
+ }
+
+ @Override
+ public String deleteJobTemplateMetasByLegacyTimeline(Long legacyTimeline,
int limit) {
+ return "DELETE FROM "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " WHERE job_template_id IN (SELECT job_template_id FROM "
+ + JobTemplateMetaMapper.TABLE_NAME
+ + " WHERE deleted_at < #{legacyTimeline} AND deleted_at > 0 LIMIT
#{limit})";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/po/JobPO.java
b/core/src/main/java/org/apache/gravitino/storage/relational/po/JobPO.java
new file mode 100644
index 0000000000..236a1eb6c3
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/storage/relational/po/JobPO.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import static
org.apache.gravitino.storage.relational.utils.POConverters.DEFAULT_DELETED_AT;
+import static
org.apache.gravitino.storage.relational.utils.POConverters.INIT_VERSION;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobEntity;
+
+@Getter
+@Accessors(fluent = true)
+@EqualsAndHashCode
+@ToString
+public class JobPO {
+
+ private Long jobRunId;
+ private String jobTemplateName;
+ private Long metalakeId;
+ private String jobExecutionId;
+ private String jobRunStatus;
+ private Long jobFinishedAt;
+ private String auditInfo;
+ private Long currentVersion;
+ private Long lastVersion;
+ private Long deletedAt;
+
+ public JobPO() {
+ // Default constructor for JPA
+ }
+
+ @lombok.Builder(setterPrefix = "with")
+ private JobPO(
+ Long jobRunId,
+ String jobTemplateName,
+ Long metalakeId,
+ String jobExecutionId,
+ String jobRunStatus,
+ Long jobFinishedAt,
+ String auditInfo,
+ Long currentVersion,
+ Long lastVersion,
+ Long deletedAt) {
+ Preconditions.checkArgument(jobRunId != null, "jobRunId cannot be null");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(jobTemplateName), "jobTemplateName cannot be
blank");
+ Preconditions.checkArgument(metalakeId != null, "metalakeId cannot be
null");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(jobExecutionId), "jobExecutionId cannot be
blank");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(jobRunStatus), "jobRunStatus cannot be blank");
+ Preconditions.checkArgument(jobFinishedAt != null, "jobFinishedAt cannot
be null");
+ Preconditions.checkArgument(StringUtils.isNotBlank(auditInfo), "auditInfo
cannot be blank");
+ Preconditions.checkArgument(currentVersion != null, "currentVersion cannot
be null");
+ Preconditions.checkArgument(lastVersion != null, "lastVersion cannot be
null");
+ Preconditions.checkArgument(deletedAt != null, "deletedAt cannot be null");
+
+ this.jobRunId = jobRunId;
+ this.jobTemplateName = jobTemplateName;
+ this.metalakeId = metalakeId;
+ this.jobExecutionId = jobExecutionId;
+ this.jobRunStatus = jobRunStatus;
+ this.jobFinishedAt = jobFinishedAt;
+ this.auditInfo = auditInfo;
+ this.currentVersion = currentVersion;
+ this.lastVersion = lastVersion;
+ this.deletedAt = deletedAt;
+ }
+
+ public static class JobPOBuilder {
+ // Builder class for JobPO
+ // Lombok will generate the builder methods based on the fields defined in
JobPO
+ }
+
+ public static JobPO initializeJobPO(JobEntity jobEntity, JobPOBuilder
builder) {
+ // We should not keep the terminated job entities in the database forever,
so we set the
+ // current time as the finished timestamp if the job is in a terminal
state,
+ // So the entity GC cleaner will clean it up later.
+ long finished = DEFAULT_DELETED_AT;
+ if (jobEntity.status() == JobHandle.Status.CANCELLED
+ || jobEntity.status() == JobHandle.Status.FAILED
+ || jobEntity.status() == JobHandle.Status.SUCCEEDED) {
+ finished = System.currentTimeMillis();
+ }
+
+ try {
+ return builder
+ .withJobRunId(jobEntity.id())
+ .withJobTemplateName(jobEntity.jobTemplateName())
+ .withJobExecutionId(jobEntity.jobExecutionId())
+ .withJobRunStatus(jobEntity.status().name())
+ .withJobFinishedAt(finished)
+
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(jobEntity.auditInfo()))
+ .withCurrentVersion(INIT_VERSION)
+ .withLastVersion(INIT_VERSION)
+ .withDeletedAt(DEFAULT_DELETED_AT)
+ .build();
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to serialize job entity", e);
+ }
+ }
+
+ public static JobEntity fromJobPO(JobPO jobPO, Namespace namespace) {
+ try {
+ return JobEntity.builder()
+ .withId(jobPO.jobRunId)
+ .withJobExecutionId(jobPO.jobExecutionId)
+ .withNamespace(namespace)
+ .withStatus(JobHandle.Status.valueOf(jobPO.jobRunStatus))
+ .withJobTemplateName(jobPO.jobTemplateName)
+ .withAuditInfo(JsonUtils.anyFieldMapper().readValue(jobPO.auditInfo,
AuditInfo.class))
+ .build();
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to deserialize job PO", e);
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/po/JobTemplatePO.java
b/core/src/main/java/org/apache/gravitino/storage/relational/po/JobTemplatePO.java
new file mode 100644
index 0000000000..6efd8fb141
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/po/JobTemplatePO.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import static
org.apache.gravitino.storage.relational.utils.POConverters.DEFAULT_DELETED_AT;
+import static
org.apache.gravitino.storage.relational.utils.POConverters.INIT_VERSION;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobTemplateEntity;
+
+@Getter
+@Accessors(fluent = true)
+@EqualsAndHashCode
+@ToString
+public class JobTemplatePO {
+
+ private Long jobTemplateId;
+ private String jobTemplateName;
+ private Long metalakeId;
+ private String jobTemplateComment;
+ private String jobTemplateContent;
+ private String auditInfo;
+ private Long currentVersion;
+ private Long lastVersion;
+ private Long deletedAt;
+
+ public JobTemplatePO() {
+ // Default constructor for JPA
+ }
+
+ @lombok.Builder(setterPrefix = "with")
+ private JobTemplatePO(
+ Long jobTemplateId,
+ String jobTemplateName,
+ Long metalakeId,
+ String jobTemplateComment,
+ String jobTemplateContent,
+ String auditInfo,
+ Long currentVersion,
+ Long lastVersion,
+ Long deletedAt) {
+ Preconditions.checkArgument(jobTemplateId != null, "jobTemplateId cannot
be null");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(jobTemplateName), "jobTemplateName cannot be
blank");
+ Preconditions.checkArgument(metalakeId != null, "metalakeId cannot be
null");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(jobTemplateContent), "jobTemplateContent cannot
be blank");
+ Preconditions.checkArgument(StringUtils.isNotBlank(auditInfo), "auditInfo
cannot be blank");
+ Preconditions.checkArgument(currentVersion != null, "currentVersion cannot
be null");
+ Preconditions.checkArgument(lastVersion != null, "lastVersion cannot be
null");
+ Preconditions.checkArgument(deletedAt != null, "deletedAt cannot be null");
+
+ this.jobTemplateId = jobTemplateId;
+ this.jobTemplateName = jobTemplateName;
+ this.metalakeId = metalakeId;
+ this.jobTemplateComment = jobTemplateComment;
+ this.jobTemplateContent = jobTemplateContent;
+ this.auditInfo = auditInfo;
+ this.currentVersion = currentVersion;
+ this.lastVersion = lastVersion;
+ this.deletedAt = deletedAt;
+ }
+
+ public static class JobTemplatePOBuilder {
+ // Builder class for JobTemplatePO
+ // Lombok will generate the builder methods based on the fields defined in
JobTemplatePO
+ }
+
+ public static JobTemplatePO initializeJobTemplatePO(
+ JobTemplateEntity jobTemplateEntity, JobTemplatePOBuilder builder) {
+ try {
+ return builder
+ .withJobTemplateId(jobTemplateEntity.id())
+ .withJobTemplateName(jobTemplateEntity.name())
+ .withJobTemplateComment(jobTemplateEntity.comment())
+ .withJobTemplateContent(
+
JsonUtils.anyFieldMapper().writeValueAsString(jobTemplateEntity.templateContent()))
+ .withAuditInfo(
+
JsonUtils.anyFieldMapper().writeValueAsString(jobTemplateEntity.auditInfo()))
+ .withCurrentVersion(INIT_VERSION)
+ .withLastVersion(INIT_VERSION)
+ .withDeletedAt(DEFAULT_DELETED_AT)
+ .build();
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to serialize job template entity", e);
+ }
+ }
+
+ public static JobTemplateEntity fromJobTemplatePO(
+ JobTemplatePO jobTemplatePO, Namespace namespace) {
+ try {
+ return JobTemplateEntity.builder()
+ .withId(jobTemplatePO.jobTemplateId())
+ .withName(jobTemplatePO.jobTemplateName())
+ .withNamespace(namespace)
+ .withComment(jobTemplatePO.jobTemplateComment())
+ .withTemplateContent(
+ JsonUtils.anyFieldMapper()
+ .readValue(
+ jobTemplatePO.jobTemplateContent(),
JobTemplateEntity.TemplateContent.class))
+ .withAuditInfo(
+ JsonUtils.anyFieldMapper().readValue(jobTemplatePO.auditInfo(),
AuditInfo.class))
+ .build();
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to deserialize job template PO", e);
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
new file mode 100644
index 0000000000..ebc9c70760
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.storage.relational.mapper.JobMetaMapper;
+import org.apache.gravitino.storage.relational.po.JobPO;
+import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.apache.gravitino.utils.NamespaceUtil;
+
+public class JobMetaService {
+
+ private static final JobMetaService INSTANCE = new JobMetaService();
+
+ private JobMetaService() {
+ // Private constructor to prevent instantiation
+ }
+
+ public static JobMetaService getInstance() {
+ return INSTANCE;
+ }
+
+ public List<JobEntity> listJobsByNamespace(Namespace ns) {
+ String metalakeName = ns.level(0);
+ List<JobPO> jobPOs =
+ SessionUtils.getWithoutCommit(
+ JobMetaMapper.class, mapper ->
mapper.listJobPOsByMetalake(metalakeName));
+ return jobPOs.stream().map(po -> JobPO.fromJobPO(po,
ns)).collect(Collectors.toList());
+ }
+
+ public List<JobEntity> listJobsByTemplateIdent(NameIdentifier
jobTemplateIdent) {
+ String metalakeName = jobTemplateIdent.namespace().level(0);
+ String jobTemplateName = jobTemplateIdent.name();
+ List<JobPO> jobPOs =
+ SessionUtils.getWithoutCommit(
+ JobMetaMapper.class,
+ mapper -> mapper.listJobPOsByMetalakeAndTemplate(metalakeName,
jobTemplateName));
+ return jobPOs.stream()
+ .map(po -> JobPO.fromJobPO(po, NamespaceUtil.ofJob(metalakeName)))
+ .collect(Collectors.toList());
+ }
+
+ public JobEntity getJobByIdentifier(NameIdentifier ident) {
+ String metalakeName = ident.namespace().level(0);
+ String jobRunId = ident.name();
+ Long jobRunIdLong;
+ try {
+ jobRunIdLong =
Long.parseLong(jobRunId.substring(JobHandle.JOB_ID_PREFIX.length()));
+ } catch (NumberFormatException e) {
+ throw new NoSuchEntityException("Invalid job run ID format %s",
jobRunId);
+ }
+
+ JobPO jobPO =
+ SessionUtils.getWithoutCommit(
+ JobMetaMapper.class,
+ mapper -> mapper.selectJobPOByMetalakeAndRunId(metalakeName,
jobRunIdLong));
+ if (jobPO == null) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.JOB.name().toLowerCase(Locale.ROOT),
+ jobRunId);
+ }
+ return JobPO.fromJobPO(jobPO, ident.namespace());
+ }
+
+ public void insertJob(JobEntity jobEntity, boolean overwrite) throws
IOException {
+ String metalakeName = jobEntity.namespace().level(0);
+
+ try {
+ Long metalakeId =
MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
+
+ JobPO.JobPOBuilder builder = JobPO.builder().withMetalakeId(metalakeId);
+ JobPO jobPO = JobPO.initializeJobPO(jobEntity, builder);
+
+ SessionUtils.doWithCommit(
+ JobMetaMapper.class,
+ mapper -> {
+ if (overwrite) {
+ mapper.insertJobMetaOnDuplicateKeyUpdate(jobPO);
+ } else {
+ mapper.insertJobMeta(jobPO);
+ }
+ });
+ } catch (RuntimeException e) {
+ ExceptionUtils.checkSQLException(e, Entity.EntityType.JOB,
jobEntity.id().toString());
+ }
+ }
+
+ public int deleteJobsByLegacyTimeline(long legacyTimeline, int limit) {
+ // Mark jobs as deleted for finished jobs, so that they can be cleaned up
later
+ SessionUtils.doWithCommit(
+ JobMetaMapper.class, mapper ->
mapper.softDeleteJobMetasByLegacyTimeline(legacyTimeline));
+
+ return SessionUtils.doWithCommitAndFetchResult(
+ JobMetaMapper.class,
+ mapper -> mapper.deleteJobMetasByLegacyTimeline(legacyTimeline,
limit));
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobTemplateMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobTemplateMetaService.java
new file mode 100644
index 0000000000..c8c61cb51f
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobTemplateMetaService.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.storage.relational.mapper.JobMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
+import org.apache.gravitino.storage.relational.po.JobTemplatePO;
+import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+
+public class JobTemplateMetaService {
+
+ private static final JobTemplateMetaService INSTANCE = new
JobTemplateMetaService();
+
+ private JobTemplateMetaService() {
+ // Private constructor to prevent instantiation
+ }
+
+ public static JobTemplateMetaService getInstance() {
+ return INSTANCE;
+ }
+
+ public List<JobTemplateEntity> listJobTemplatesByNamespace(Namespace ns) {
+ String metalakeName = ns.level(0);
+ List<JobTemplatePO> jobTemplatePOs =
+ SessionUtils.getWithoutCommit(
+ JobTemplateMetaMapper.class,
+ mapper -> mapper.listJobTemplatePOsByMetalake(metalakeName));
+
+ return jobTemplatePOs.stream()
+ .map(p -> JobTemplatePO.fromJobTemplatePO(p, ns))
+ .collect(Collectors.toList());
+ }
+
+ public JobTemplateEntity getJobTemplateByIdentifier(NameIdentifier
jobTemplateIdent) {
+ String metalakeName = jobTemplateIdent.namespace().level(0);
+ String jobTemplateName = jobTemplateIdent.name();
+
+ JobTemplatePO jobTemplatePO =
+ SessionUtils.getWithoutCommit(
+ JobTemplateMetaMapper.class,
+ mapper ->
mapper.selectJobTemplatePOByMetalakeAndName(metalakeName, jobTemplateName));
+
+ if (jobTemplatePO == null) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.JOB_TEMPLATE.name().toLowerCase(Locale.ROOT),
+ jobTemplateName);
+ }
+
+ return JobTemplatePO.fromJobTemplatePO(jobTemplatePO,
jobTemplateIdent.namespace());
+ }
+
+ public void insertJobTemplate(JobTemplateEntity jobTemplateEntity, boolean
overwrite)
+ throws IOException {
+ String metalakeName = jobTemplateEntity.namespace().level(0);
+
+ try {
+ Long metalakeId =
MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
+
+ JobTemplatePO.JobTemplatePOBuilder builder =
+ JobTemplatePO.builder().withMetalakeId(metalakeId);
+ JobTemplatePO jobTemplatePO =
+ JobTemplatePO.initializeJobTemplatePO(jobTemplateEntity, builder);
+
+ SessionUtils.doWithCommit(
+ JobTemplateMetaMapper.class,
+ mapper -> {
+ if (overwrite) {
+ mapper.insertJobTemplateMetaOnDuplicateKeyUpdate(jobTemplatePO);
+ } else {
+ mapper.insertJobTemplateMeta(jobTemplatePO);
+ }
+ });
+ } catch (RuntimeException e) {
+ ExceptionUtils.checkSQLException(e, Entity.EntityType.JOB_TEMPLATE,
jobTemplateEntity.name());
+ throw e;
+ }
+ }
+
+ public boolean deleteJobTemplate(NameIdentifier jobTemplateIdent) {
+ String metalakeName = jobTemplateIdent.namespace().level(0);
+ String jobTemplateName = jobTemplateIdent.name();
+
+ AtomicInteger result = new AtomicInteger(0);
+ SessionUtils.doMultipleWithCommit(
+ () ->
+ SessionUtils.doWithoutCommit(
+ JobMetaMapper.class,
+ mapper ->
+
mapper.softDeleteJobMetaByMetalakeAndTemplate(metalakeName, jobTemplateName)),
+ () ->
+ result.set(
+ SessionUtils.doWithoutCommitAndFetchResult(
+ JobTemplateMetaMapper.class,
+ mapper ->
+ mapper.softDeleteJobTemplateMetaByMetalakeAndName(
+ metalakeName, jobTemplateName))));
+ return result.get() > 0;
+ }
+
+ public int deleteJobTemplatesByLegacyTimeline(long legacyTimeline, int
limit) {
+ return SessionUtils.doWithCommitAndFetchResult(
+ JobTemplateMetaMapper.class,
+ mapper ->
mapper.deleteJobTemplateMetasByLegacyTimeline(legacyTimeline, limit));
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
index ac7b32306b..6c2b6b7c57 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
@@ -36,6 +36,8 @@ import
org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
import org.apache.gravitino.storage.relational.mapper.GroupRoleRelMapper;
+import org.apache.gravitino.storage.relational.mapper.JobMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
@@ -268,7 +270,15 @@ public class MetalakeMetaService {
() ->
SessionUtils.doWithoutCommit(
ModelMetaMapper.class,
- mapper ->
mapper.softDeleteModelMetasByMetalakeId(metalakeId)));
+ mapper ->
mapper.softDeleteModelMetasByMetalakeId(metalakeId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ JobTemplateMetaMapper.class,
+ mapper ->
mapper.softDeleteJobTemplateMetasByMetalakeId(metalakeId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ JobMetaMapper.class,
+ mapper ->
mapper.softDeleteJobMetasByMetalakeId(metalakeId)));
} else {
List<CatalogEntity> catalogEntities =
CatalogMetaService.getInstance()
@@ -317,7 +327,15 @@ public class MetalakeMetaService {
() ->
SessionUtils.doWithoutCommit(
OwnerMetaMapper.class,
- mapper ->
mapper.softDeleteOwnerRelByMetalakeId(metalakeId)));
+ mapper ->
mapper.softDeleteOwnerRelByMetalakeId(metalakeId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ JobTemplateMetaMapper.class,
+ mapper ->
mapper.softDeleteJobTemplateMetasByMetalakeId(metalakeId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ JobMetaMapper.class,
+ mapper ->
mapper.softDeleteJobMetasByMetalakeId(metalakeId)));
}
}
return true;
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index b295ddf110..7d00cbd947 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -93,8 +93,8 @@ import org.apache.gravitino.utils.PrincipalUtils;
/** POConverters is a utility class to convert PO to Base and vice versa. */
public class POConverters {
- private static final long INIT_VERSION = 1L;
- private static final long DEFAULT_DELETED_AT = 0L;
+ public static final long INIT_VERSION = 1L;
+ public static final long DEFAULT_DELETED_AT = 0L;
private POConverters() {}
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/po/TestJobPO.java
b/core/src/test/java/org/apache/gravitino/storage/relational/po/TestJobPO.java
new file mode 100644
index 0000000000..3fd8f9988b
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/po/TestJobPO.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.time.Instant;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.ShellJobTemplate;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestJobPO {
+
+ @Test
+ public void testJobTemplatePO() {
+ JobTemplate shellJobTemplate =
+ ShellJobTemplate.builder()
+ .withName("shell-job-template")
+ .withComment("This is a shell job template")
+ .withExecutable("/bin/echo")
+ .withArguments(Lists.newArrayList("Hello, World!"))
+ .withEnvironments(ImmutableMap.of("ENV_VAR", "value"))
+ .withScripts(Lists.newArrayList("/path/to/script.sh"))
+ .build();
+
+ JobTemplateEntity shellTemplateEntity =
+ JobTemplateEntity.builder()
+ .withName(shellJobTemplate.name())
+ .withTemplateContent(
+
JobTemplateEntity.TemplateContent.fromJobTemplate(shellJobTemplate))
+ .withComment(shellJobTemplate.comment())
+ .withNamespace(NamespaceUtil.ofJobTemplate("test"))
+ .withId(1L)
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+
+ JobTemplatePO.JobTemplatePOBuilder builder =
JobTemplatePO.builder().withMetalakeId(1L);
+
+ JobTemplatePO jobTemplatePO =
+ JobTemplatePO.initializeJobTemplatePO(shellTemplateEntity, builder);
+ JobTemplateEntity resultEntity =
+ JobTemplatePO.fromJobTemplatePO(jobTemplatePO,
NamespaceUtil.ofJobTemplate("test"));
+
+ Assertions.assertEquals(shellTemplateEntity.name(), resultEntity.name());
+ Assertions.assertEquals(shellTemplateEntity.templateContent(),
resultEntity.templateContent());
+ Assertions.assertEquals(shellTemplateEntity.comment(),
resultEntity.comment());
+ Assertions.assertEquals(shellTemplateEntity.namespace(),
resultEntity.namespace());
+ Assertions.assertEquals(shellTemplateEntity.id(), resultEntity.id());
+ Assertions.assertEquals(
+ shellTemplateEntity.auditInfo().creator(),
resultEntity.auditInfo().creator());
+
+ JobTemplate sparkJobTemplate =
+ SparkJobTemplate.builder()
+ .withName("spark-job-template")
+ .withComment("This is a spark job template")
+ .withExecutable("/path/to/spark-demo.jar")
+ .withClassName("org.example.SparkDemo")
+ .withArguments(
+ Lists.newArrayList("--input", "/path/to/input", "--output",
"/path/to/output"))
+ .withEnvironments(ImmutableMap.of("SPARK_ENV_VAR", "value"))
+ .withConfigs(ImmutableMap.of("spark.executor.memory", "2g"))
+ .withJars(Lists.newArrayList("/path/to/dependency.jar"))
+ .withFiles(Lists.newArrayList("/path/to/config.yaml"))
+ .withArchives(Lists.newArrayList("/path/to/archive.zip"))
+ .build();
+
+ JobTemplateEntity sparkTemplateEntity =
+ JobTemplateEntity.builder()
+ .withName(sparkJobTemplate.name())
+ .withTemplateContent(
+
JobTemplateEntity.TemplateContent.fromJobTemplate(sparkJobTemplate))
+ .withComment(sparkJobTemplate.comment())
+ .withNamespace(NamespaceUtil.ofJobTemplate("test"))
+ .withId(2L)
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+
+ JobTemplatePO.JobTemplatePOBuilder builder2 =
JobTemplatePO.builder().withMetalakeId(1L);
+ JobTemplatePO sparkJobTemplatePO =
+ JobTemplatePO.initializeJobTemplatePO(sparkTemplateEntity, builder2);
+
+ JobTemplateEntity resultSparkEntity =
+ JobTemplatePO.fromJobTemplatePO(sparkJobTemplatePO,
NamespaceUtil.ofJobTemplate("test"));
+
+ Assertions.assertEquals(sparkTemplateEntity.name(),
resultSparkEntity.name());
+ Assertions.assertEquals(
+ sparkTemplateEntity.templateContent(),
resultSparkEntity.templateContent());
+ Assertions.assertEquals(sparkTemplateEntity.comment(),
resultSparkEntity.comment());
+ Assertions.assertEquals(sparkTemplateEntity.namespace(),
resultSparkEntity.namespace());
+ Assertions.assertEquals(sparkTemplateEntity.id(), resultSparkEntity.id());
+ Assertions.assertEquals(
+ sparkTemplateEntity.auditInfo().creator(),
resultSparkEntity.auditInfo().creator());
+ }
+
+ @Test
+ public void testJobPO() {
+ JobEntity jobEntity =
+ JobEntity.builder()
+ .withId(1L)
+ .withJobExecutionId("job-execution-1")
+ .withJobTemplateName("test-job-template")
+ .withStatus(JobHandle.Status.QUEUED)
+ .withNamespace(NamespaceUtil.ofJob("test"))
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+
+ JobPO.JobPOBuilder builder = JobPO.builder().withMetalakeId(1L);
+ JobPO jobPO = JobPO.initializeJobPO(jobEntity, builder);
+ JobEntity resultEntity = JobPO.fromJobPO(jobPO,
NamespaceUtil.ofJob("test"));
+
+ Assertions.assertEquals(jobEntity.id(), resultEntity.id());
+ Assertions.assertEquals(jobEntity.jobExecutionId(),
resultEntity.jobExecutionId());
+ Assertions.assertEquals(jobEntity.jobTemplateName(),
resultEntity.jobTemplateName());
+ Assertions.assertEquals(jobEntity.status(), resultEntity.status());
+ Assertions.assertEquals(jobEntity.namespace(), resultEntity.namespace());
+ Assertions.assertEquals(jobEntity.auditInfo().creator(),
resultEntity.auditInfo().creator());
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
new file mode 100644
index 0000000000..6c24addb95
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestJobMetaService extends TestJDBCBackend {
+
+ private static final String METALAKE_NAME = "metalake_test_job_meta_service";
+
+ private static final AuditInfo AUDIT_INFO =
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+
+ @Test
+ public void testInsertAndListJobs() throws IOException {
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME,
AUDIT_INFO);
+ backend.insert(metalake, false);
+
+ JobTemplateEntity jobTemplate =
+ TestJobTemplateMetaService.newShellJobTemplateEntity(
+ "test_job_template", "test_comment", METALAKE_NAME);
+ JobTemplateMetaService.getInstance().insertJobTemplate(jobTemplate, false);
+
+ JobEntity job1 =
+ TestJobTemplateMetaService.newJobEntity(
+ jobTemplate.name(), JobHandle.Status.QUEUED, METALAKE_NAME);
+ Assertions.assertDoesNotThrow(() ->
JobMetaService.getInstance().insertJob(job1, false));
+
+ JobEntity job2 =
+ TestJobTemplateMetaService.newJobEntity(
+ jobTemplate.name(), JobHandle.Status.QUEUED, METALAKE_NAME);
+ Assertions.assertDoesNotThrow(() ->
JobMetaService.getInstance().insertJob(job2, false));
+
+ List<JobEntity> jobs =
+
JobMetaService.getInstance().listJobsByNamespace(NamespaceUtil.ofJob(METALAKE_NAME));
+ Assertions.assertEquals(2, jobs.size());
+ Assertions.assertTrue(jobs.contains(job1));
+ Assertions.assertTrue(jobs.contains(job2));
+
+ // Test listing jobs by job template identifier
+ List<JobEntity> jobsByTemplate =
+
JobMetaService.getInstance().listJobsByTemplateIdent(jobTemplate.nameIdentifier());
+ Assertions.assertEquals(2, jobsByTemplate.size());
+ Assertions.assertTrue(jobsByTemplate.contains(job1));
+ Assertions.assertTrue(jobsByTemplate.contains(job2));
+
+ // Test listing jobs by non-existing template identifier
+ List<JobEntity> emptyJobs =
+ JobMetaService.getInstance()
+ .listJobsByTemplateIdent(
+ NameIdentifierUtil.ofJobTemplate(METALAKE_NAME,
"non_existing_template"));
+ Assertions.assertTrue(emptyJobs.isEmpty());
+ }
+
+ @Test
+ public void testInsertAndGetJob() throws IOException {
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME,
AUDIT_INFO);
+ backend.insert(metalake, false);
+
+ JobTemplateEntity jobTemplate =
+ TestJobTemplateMetaService.newShellJobTemplateEntity(
+ "test_job_template", "test_comment", METALAKE_NAME);
+ JobTemplateMetaService.getInstance().insertJobTemplate(jobTemplate, false);
+
+ JobEntity job =
+ TestJobTemplateMetaService.newJobEntity(
+ jobTemplate.name(), JobHandle.Status.QUEUED, METALAKE_NAME);
+ Assertions.assertDoesNotThrow(() ->
JobMetaService.getInstance().insertJob(job, false));
+
+ JobEntity retrievedJob =
+ JobMetaService.getInstance()
+ .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME,
job.name()));
+ Assertions.assertEquals(job, retrievedJob);
+
+ // Test getting a job with a non-existing identifier
+ Assertions.assertThrows(
+ NoSuchEntityException.class,
+ () ->
+ JobMetaService.getInstance()
+ .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME,
"non_existing_job")));
+
+ // Test insert duplicate job
+ Assertions.assertThrows(
+ EntityAlreadyExistsException.class,
+ () -> JobMetaService.getInstance().insertJob(job, false));
+
+ // Test insert job with overwrite
+ JobEntity jobOverwrite =
+ JobEntity.builder()
+ .withId(job.id())
+ .withJobExecutionId("job-execution-new")
+ .withStatus(JobHandle.Status.FAILED)
+ .withNamespace(job.namespace())
+ .withAuditInfo(job.auditInfo())
+ .withJobTemplateName(job.jobTemplateName())
+ .build();
+ Assertions.assertDoesNotThrow(() ->
JobMetaService.getInstance().insertJob(jobOverwrite, true));
+ JobEntity updatedJob =
+ JobMetaService.getInstance()
+ .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME,
jobOverwrite.name()));
+ Assertions.assertEquals(jobOverwrite, updatedJob);
+ }
+
+ @Test
+ public void testDeleteJobsByLegacyTimeline() throws IOException {
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME,
AUDIT_INFO);
+ backend.insert(metalake, false);
+
+ JobTemplateEntity jobTemplate =
+ TestJobTemplateMetaService.newShellJobTemplateEntity(
+ "test_job_template", "test_comment", METALAKE_NAME);
+ JobTemplateMetaService.getInstance().insertJobTemplate(jobTemplate, false);
+
+ JobEntity job =
+ TestJobTemplateMetaService.newJobEntity(
+ jobTemplate.name(), JobHandle.Status.QUEUED, METALAKE_NAME);
+ Assertions.assertDoesNotThrow(() ->
JobMetaService.getInstance().insertJob(job, false));
+
+ JobEntity retrievedJob =
+ JobMetaService.getInstance()
+ .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME,
job.name()));
+ Assertions.assertEquals(job, retrievedJob);
+
+ long timestamp = System.currentTimeMillis();
+ JobEntity updatedJob =
+ JobEntity.builder()
+ .withId(job.id())
+ .withJobExecutionId(job.jobExecutionId())
+ .withStatus(JobHandle.Status.SUCCEEDED)
+ .withNamespace(job.namespace())
+ .withAuditInfo(job.auditInfo())
+ .withJobTemplateName(job.jobTemplateName())
+ .build();
+ Assertions.assertDoesNotThrow(() ->
JobMetaService.getInstance().insertJob(updatedJob, true));
+
+ retrievedJob =
+ JobMetaService.getInstance()
+ .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME,
updatedJob.name()));
+ Assertions.assertEquals(updatedJob, retrievedJob);
+
+ long newTimestamp = timestamp + 1000;
+ Assertions.assertDoesNotThrow(
+ () ->
JobMetaService.getInstance().deleteJobsByLegacyTimeline(newTimestamp, 10));
+
+ List<JobEntity> jobs =
+
JobMetaService.getInstance().listJobsByNamespace(NamespaceUtil.ofJob(METALAKE_NAME));
+ Assertions.assertTrue(jobs.isEmpty(), "Jobs should be deleted by legacy
timeline");
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobTemplateMetaService.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobTemplateMetaService.java
new file mode 100644
index 0000000000..2eb21c7620
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobTemplateMetaService.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.ShellJobTemplate;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestJobTemplateMetaService extends TestJDBCBackend {
+
+ private static final String METALAKE_NAME = "metalake_for_job_template_test";
+
+ private static final AuditInfo AUDIT_INFO =
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+
+ @Test
+ public void testInsertAndListJobTemplates() throws IOException {
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME,
AUDIT_INFO);
+ backend.insert(metalake, false);
+
+ JobTemplateMetaService jobTemplateMetaService =
JobTemplateMetaService.getInstance();
+ Assertions.assertThrows(
+ NoSuchEntityException.class,
+ () ->
+ jobTemplateMetaService.getJobTemplateByIdentifier(
+ NameIdentifierUtil.ofJobTemplate(METALAKE_NAME,
"non_existent_template")));
+
+ JobTemplateEntity testJobTemplateEntity1 =
+ newShellJobTemplateEntity(
+ "test_shell_template_1", "This is a test shell job template 1",
METALAKE_NAME);
+
+ Assertions.assertDoesNotThrow(
+ () -> jobTemplateMetaService.insertJobTemplate(testJobTemplateEntity1,
false));
+
+ JobTemplateEntity testJobTemplateEntity2 =
+ newSparkJobTemplateEntity(
+ "test_spark_template_1", "This is a test spark job template 1",
METALAKE_NAME);
+
+ Assertions.assertDoesNotThrow(
+ () -> jobTemplateMetaService.insertJobTemplate(testJobTemplateEntity2,
false));
+
+ List<JobTemplateEntity> jobTemplates =
+ jobTemplateMetaService.listJobTemplatesByNamespace(
+ NamespaceUtil.ofJobTemplate(METALAKE_NAME));
+
+ Assertions.assertEquals(2, jobTemplates.size());
+ Assertions.assertTrue(jobTemplates.contains(testJobTemplateEntity1));
+ Assertions.assertTrue(jobTemplates.contains(testJobTemplateEntity2));
+
+ // Test insert duplicate job template without overwrite
+ Assertions.assertThrows(
+ EntityAlreadyExistsException.class,
+ () -> jobTemplateMetaService.insertJobTemplate(testJobTemplateEntity1,
false));
+
+ // Test insert duplicate job template with overwrite
+ JobTemplateEntity updatedJobTemplateEntity1 =
+ JobTemplateEntity.builder()
+ .withName(testJobTemplateEntity1.name())
+ .withComment("Updated comment for test shell job template 1")
+ .withId(testJobTemplateEntity1.id())
+ .withNamespace(testJobTemplateEntity1.namespace())
+ .withTemplateContent(testJobTemplateEntity1.templateContent())
+ .withAuditInfo(testJobTemplateEntity1.auditInfo())
+ .build();
+ Assertions.assertDoesNotThrow(
+ () ->
jobTemplateMetaService.insertJobTemplate(updatedJobTemplateEntity1, true));
+ JobTemplateEntity fetchedJobTemplateEntity1 =
+ jobTemplateMetaService.getJobTemplateByIdentifier(
+ NameIdentifierUtil.ofJobTemplate(METALAKE_NAME,
testJobTemplateEntity1.name()));
+ Assertions.assertEquals(updatedJobTemplateEntity1,
fetchedJobTemplateEntity1);
+
+ jobTemplates =
+ jobTemplateMetaService.listJobTemplatesByNamespace(
+ NamespaceUtil.ofJobTemplate(METALAKE_NAME));
+ Assertions.assertEquals(2, jobTemplates.size());
+ Assertions.assertTrue(jobTemplates.contains(updatedJobTemplateEntity1));
+ Assertions.assertTrue(jobTemplates.contains(testJobTemplateEntity2));
+ Assertions.assertFalse(jobTemplates.contains(testJobTemplateEntity1));
+ }
+
+ @Test
+ public void testInsertAndSelectJobTemplate() throws IOException {
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME,
AUDIT_INFO);
+ backend.insert(metalake, false);
+
+ JobTemplateMetaService jobTemplateMetaService =
JobTemplateMetaService.getInstance();
+
+ JobTemplateEntity shellJobTemplate =
+ newShellJobTemplateEntity("shell_template", "A shell job template",
METALAKE_NAME);
+ jobTemplateMetaService.insertJobTemplate(shellJobTemplate, false);
+
+ JobTemplateEntity sparkJobTemplate =
+ newSparkJobTemplateEntity("spark_template", "A spark job template",
METALAKE_NAME);
+ jobTemplateMetaService.insertJobTemplate(sparkJobTemplate, false);
+
+ // Test select by identifier
+ JobTemplateEntity fetchedShellJobTemplate =
+ jobTemplateMetaService.getJobTemplateByIdentifier(
+ NameIdentifierUtil.ofJobTemplate(METALAKE_NAME, "shell_template"));
+ Assertions.assertEquals(shellJobTemplate, fetchedShellJobTemplate);
+
+ JobTemplateEntity fetchedSparkJobTemplate =
+ jobTemplateMetaService.getJobTemplateByIdentifier(
+ NameIdentifierUtil.ofJobTemplate(METALAKE_NAME, "spark_template"));
+ Assertions.assertEquals(sparkJobTemplate, fetchedSparkJobTemplate);
+
+ // Test select non-existent template
+ Assertions.assertThrows(
+ NoSuchEntityException.class,
+ () ->
+ jobTemplateMetaService.getJobTemplateByIdentifier(
+ NameIdentifierUtil.ofJobTemplate(METALAKE_NAME,
"non_existent_template")));
+ }
+
+ @Test
+ public void testInsertAndDeleteJobTemplate() throws IOException {
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME,
AUDIT_INFO);
+ backend.insert(metalake, false);
+
+ JobTemplateMetaService jobTemplateMetaService =
JobTemplateMetaService.getInstance();
+
+ JobTemplateEntity jobTemplateEntity =
+ newShellJobTemplateEntity(
+ "job_template_to_delete", "A job template to delete",
METALAKE_NAME);
+ jobTemplateMetaService.insertJobTemplate(jobTemplateEntity, false);
+
+ // Verify insertion
+ JobTemplateEntity fetchedJobTemplate =
+ jobTemplateMetaService.getJobTemplateByIdentifier(
+ NameIdentifierUtil.ofJobTemplate(METALAKE_NAME,
"job_template_to_delete"));
+ Assertions.assertEquals(jobTemplateEntity, fetchedJobTemplate);
+
+ // Delete the job template
+ boolean deleted =
+ jobTemplateMetaService.deleteJobTemplate(
+ NameIdentifierUtil.ofJobTemplate(METALAKE_NAME,
"job_template_to_delete"));
+ Assertions.assertTrue(deleted);
+
+ deleted =
+ jobTemplateMetaService.deleteJobTemplate(
+ NameIdentifierUtil.ofJobTemplate(METALAKE_NAME,
"job_template_to_delete"));
+ Assertions.assertFalse(deleted);
+ }
+
+ @Test
+ public void testDeleteJobTemplateWithJobs() throws IOException {
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME,
AUDIT_INFO);
+ backend.insert(metalake, false);
+
+ JobTemplateMetaService jobTemplateMetaService =
JobTemplateMetaService.getInstance();
+
+ JobTemplateEntity jobTemplateEntity =
+ newShellJobTemplateEntity(
+ "job_template_with_jobs", "A job template with jobs",
METALAKE_NAME);
+ jobTemplateMetaService.insertJobTemplate(jobTemplateEntity, false);
+
+ // Create a job using the template
+ JobEntity jobEntity1 =
+ newJobEntity("job_template_with_jobs", JobHandle.Status.STARTED,
METALAKE_NAME);
+ backend.insert(jobEntity1, false);
+
+ JobEntity jobEntity2 =
+ newJobEntity("job_template_with_jobs", JobHandle.Status.SUCCEEDED,
METALAKE_NAME);
+ backend.insert(jobEntity2, false);
+
+ boolean deleted =
+ jobTemplateMetaService.deleteJobTemplate(
+ NameIdentifierUtil.ofJobTemplate(METALAKE_NAME,
"job_template_with_jobs"));
+ Assertions.assertTrue(deleted);
+
+ // Verify that the jobs are deleted
+ List<JobEntity> jobs =
+
JobMetaService.getInstance().listJobsByNamespace(NamespaceUtil.ofJob(METALAKE_NAME));
+ Assertions.assertEquals(0, jobs.size());
+ }
+
+ static JobTemplateEntity newShellJobTemplateEntity(String name, String
comment, String metalake) {
+ ShellJobTemplate shellJobTemplate =
+ ShellJobTemplate.builder()
+ .withName(name)
+ .withComment(comment)
+ .withExecutable("/bin/echo")
+ .build();
+
+ return JobTemplateEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName(name)
+ .withNamespace(NamespaceUtil.ofJobTemplate(metalake))
+
.withTemplateContent(JobTemplateEntity.TemplateContent.fromJobTemplate(shellJobTemplate))
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+ }
+
+ static JobTemplateEntity newSparkJobTemplateEntity(String name, String
comment, String metalake) {
+ SparkJobTemplate sparkJobTemplate =
+ SparkJobTemplate.builder()
+ .withName(name)
+ .withComment(comment)
+ .withClassName("org.apache.spark.examples.SparkPi")
+ .withExecutable("file:/path/to/spark-examples.jar")
+ .build();
+
+ return JobTemplateEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName(name)
+ .withNamespace(NamespaceUtil.ofJobTemplate(metalake))
+
.withTemplateContent(JobTemplateEntity.TemplateContent.fromJobTemplate(sparkJobTemplate))
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+ }
+
+ static JobEntity newJobEntity(String templateName, JobHandle.Status status,
String metalake) {
+ return JobEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withJobExecutionId(RandomIdGenerator.INSTANCE.nextId() + "")
+ .withNamespace(NamespaceUtil.ofJob(metalake))
+ .withJobTemplateName(templateName)
+ .withStatus(status)
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+ }
+}
diff --git a/scripts/h2/schema-1.0.0-h2.sql b/scripts/h2/schema-1.0.0-h2.sql
index a02f0eb427..a245174735 100644
--- a/scripts/h2/schema-1.0.0-h2.sql
+++ b/scripts/h2/schema-1.0.0-h2.sql
@@ -383,3 +383,34 @@ CREATE TABLE IF NOT EXISTS `policy_relation_meta` (
KEY `idx_pid` (`policy_id`),
KEY `idx_prmid` (`metadata_object_id`)
) ENGINE=InnoDB;
+
+CREATE TABLE IF NOT EXISTS `job_template_meta` (
+ `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+ `job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
+ `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+ `job_template_comment` TEXT DEFAULT NULL COMMENT 'job template comment',
+ `job_template_content` MEDIUMTEXT NOT NULL COMMENT 'job template content',
+ `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job template audit info',
+ `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template
current version',
+ `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template last
version',
+ `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job template
deleted at',
+ PRIMARY KEY (`job_template_id`),
+ UNIQUE KEY `uk_mid_jtn_del` (`metalake_id`, `job_template_name`,
`deleted_at`)
+) ENGINE=InnoDB;
+
+CREATE TABLE IF NOT EXISTS `job_run_meta` (
+ `job_run_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job run id',
+ `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+ `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+ `job_execution_id` varchar(256) NOT NULL COMMENT 'job execution id',
+ `job_run_status` varchar(64) NOT NULL COMMENT 'job run status',
+ `job_finished_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job
finished at',
+ `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job run audit info',
+ `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run current
version',
+ `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run last
version',
+ `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job run
deleted at',
+ PRIMARY KEY (`job_run_id`),
+ UNIQUE KEY `uk_mid_jei_del` (`metalake_id`, `job_execution_id`,
`deleted_at`),
+ KEY `idx_job_template_id` (`job_template_id`),
+ KEY `idx_job_execution_id` (`job_execution_id`)
+) ENGINE=InnoDB;
diff --git a/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql
b/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql
index f156d49551..7ceca25db4 100644
--- a/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql
+++ b/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql
@@ -67,4 +67,35 @@ ALTER TABLE `model_version_info` ADD COLUMN
`model_version_uri_name` VARCHAR(256
ALTER TABLE `model_version_info` DROP INDEX `uk_mid_ver_del`;
ALTER TABLE `model_version_info` ADD CONSTRAINT `uk_mid_ver_uri_del` UNIQUE
(`model_id`, `version`, `model_version_uri_name`, `deleted_at`);
-- remove the default value for model_version_uri_name
-ALTER TABLE `model_version_info` ALTER COLUMN `model_version_uri_name` DROP
DEFAULT;
\ No newline at end of file
+ALTER TABLE `model_version_info` ALTER COLUMN `model_version_uri_name` DROP
DEFAULT;
+
+CREATE TABLE IF NOT EXISTS `job_template_meta` (
+ `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+ `job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
+ `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+ `job_template_comment` TEXT DEFAULT NULL COMMENT 'job template comment',
+ `job_template_content` MEDIUMTEXT NOT NULL COMMENT 'job template content',
+ `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job template audit info',
+ `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template
current version',
+ `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template last
version',
+ `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job template
deleted at',
+ PRIMARY KEY (`job_template_id`),
+ UNIQUE KEY `uk_mid_jtn_del` (`metalake_id`, `job_template_name`,
`deleted_at`)
+) ENGINE=InnoDB;
+
+CREATE TABLE IF NOT EXISTS `job_run_meta` (
+ `job_run_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job run id',
+ `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+ `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+ `job_execution_id` varchar(256) NOT NULL COMMENT 'job execution id',
+ `job_run_status` varchar(64) NOT NULL COMMENT 'job run status',
+ `job_finished_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job
finished at',
+ `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job run audit info',
+ `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run current
version',
+ `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run last
version',
+ `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job run
deleted at',
+ PRIMARY KEY (`job_run_id`),
+ UNIQUE KEY `uk_mid_jei_del` (`metalake_id`, `job_execution_id`,
`deleted_at`),
+ KEY `idx_job_template_id` (`job_template_id`),
+ KEY `idx_job_execution_id` (`job_execution_id`)
+) ENGINE=InnoDB;
diff --git a/scripts/mysql/schema-1.0.0-mysql.sql
b/scripts/mysql/schema-1.0.0-mysql.sql
index b11c729432..07c14ad304 100644
--- a/scripts/mysql/schema-1.0.0-mysql.sql
+++ b/scripts/mysql/schema-1.0.0-mysql.sql
@@ -373,4 +373,35 @@ CREATE TABLE IF NOT EXISTS `policy_relation_meta` (
UNIQUE KEY `uk_pi_mi_mo_del` (`policy_id`, `metadata_object_id`,
`metadata_object_type`, `deleted_at`),
KEY `idx_pid` (`policy_id`),
KEY `idx_mid` (`metadata_object_id`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'policy
metadata object relation';
\ No newline at end of file
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'policy
metadata object relation';
+
+CREATE TABLE IF NOT EXISTS `job_template_meta` (
+ `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+ `job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
+ `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+ `job_template_comment` TEXT DEFAULT NULL COMMENT 'job template comment',
+ `job_template_content` MEDIUMTEXT NOT NULL COMMENT 'job template content',
+ `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job template audit info',
+ `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template
current version',
+ `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template last
version',
+ `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job template
deleted at',
+ PRIMARY KEY (`job_template_id`),
+ UNIQUE KEY `uk_mid_jtn_del` (`metalake_id`, `job_template_name`,
`deleted_at`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'job
template metadata';
+
+CREATE TABLE IF NOT EXISTS `job_run_meta` (
+ `job_run_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job run id',
+ `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+ `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+ `job_execution_id` varchar(256) NOT NULL COMMENT 'job execution id',
+ `job_run_status` varchar(64) NOT NULL COMMENT 'job run status',
+ `job_finished_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job
finished at',
+ `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job run audit info',
+ `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run current
version',
+ `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run last
version',
+ `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job run
deleted at',
+ PRIMARY KEY (`job_run_id`),
+ UNIQUE KEY `uk_mid_jei_del` (`metalake_id`, `job_execution_id`,
`deleted_at`),
+ KEY `idx_job_template_id` (`job_template_id`),
+ KEY `idx_job_execution_id` (`job_execution_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'job run
metadata';
diff --git a/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql
b/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql
index 48438ddfbe..c0890b5aa0 100644
--- a/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql
+++ b/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql
@@ -68,3 +68,34 @@ ALTER TABLE `model_version_info` DROP INDEX `uk_mid_ver_del`;
ALTER TABLE `model_version_info` ADD CONSTRAINT `uk_mid_ver_uri_del` UNIQUE
KEY (`model_id`, `version`, `model_version_uri_name`, `deleted_at`);
-- remove the default value for model_version_uri_name
ALTER TABLE `model_version_info` ALTER COLUMN `model_version_uri_name` DROP
DEFAULT;
+
+CREATE TABLE IF NOT EXISTS `job_template_meta` (
+ `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+ `job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
+ `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+ `job_template_comment` TEXT DEFAULT NULL COMMENT 'job template comment',
+ `job_template_content` MEDIUMTEXT NOT NULL COMMENT 'job template content',
+ `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job template audit info',
+ `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template
current version',
+ `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template last
version',
+ `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job template
deleted at',
+ PRIMARY KEY (`job_template_id`),
+ UNIQUE KEY `uk_mid_jtn_del` (`metalake_id`, `job_template_name`,
`deleted_at`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'job
template metadata';
+
+CREATE TABLE IF NOT EXISTS `job_run_meta` (
+ `job_run_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job run id',
+ `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+ `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+ `job_execution_id` varchar(256) NOT NULL COMMENT 'job execution id',
+ `job_run_status` varchar(64) NOT NULL COMMENT 'job run status',
+ `job_finished_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job
finished at',
+ `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job run audit info',
+ `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run current
version',
+ `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run last
version',
+ `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job run
deleted at',
+ PRIMARY KEY (`job_run_id`),
+ UNIQUE KEY `uk_mid_jei_del` (`metalake_id`, `job_execution_id`,
`deleted_at`),
+ KEY `idx_job_template_id` (`job_template_id`),
+ KEY `idx_job_execution_id` (`job_execution_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'job run
metadata';
diff --git a/scripts/postgresql/schema-1.0.0-postgresql.sql
b/scripts/postgresql/schema-1.0.0-postgresql.sql
index 7c8ee084fa..d97ace9844 100644
--- a/scripts/postgresql/schema-1.0.0-postgresql.sql
+++ b/scripts/postgresql/schema-1.0.0-postgresql.sql
@@ -668,3 +668,59 @@ COMMENT ON COLUMN policy_relation_meta.audit_info IS
'policy relation audit info
COMMENT ON COLUMN policy_relation_meta.current_version IS 'policy relation
current version';
COMMENT ON COLUMN policy_relation_meta.last_version IS 'policy relation last
version';
COMMENT ON COLUMN policy_relation_meta.deleted_at IS 'policy relation deleted
at';
+
+
+CREATE TABLE IF NOT EXISTS job_template_meta (
+ job_template_id BIGINT NOT NULL,
+ job_template_name VARCHAR(128) NOT NULL,
+ metalake_id BIGINT NOT NULL,
+ job_template_comment TEXT DEFAULT NULL,
+ job_template_content TEXT NOT NULL,
+ audit_info TEXT NOT NULL,
+ current_version INT NOT NULL DEFAULT 1,
+ last_version INT NOT NULL DEFAULT 1,
+ deleted_at BIGINT NOT NULL DEFAULT 0,
+ PRIMARY KEY (job_template_id),
+ UNIQUE (metalake_id, job_template_name, deleted_at)
+);
+
+COMMENT ON TABLE job_template_meta IS 'job template metadata';
+COMMENT ON COLUMN job_template_meta.job_template_id IS 'job template id';
+COMMENT ON COLUMN job_template_meta.job_template_name IS 'job template name';
+COMMENT ON COLUMN job_template_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN job_template_meta.job_template_comment IS 'job template
comment';
+COMMENT ON COLUMN job_template_meta.job_template_content IS 'job template
content';
+COMMENT ON COLUMN job_template_meta.audit_info IS 'job template audit info';
+COMMENT ON COLUMN job_template_meta.current_version IS 'job template current
version';
+COMMENT ON COLUMN job_template_meta.last_version IS 'job template last
version';
+COMMENT ON COLUMN job_template_meta.deleted_at IS 'job template deleted at';
+
+
+CREATE TABLE IF NOT EXISTS job_run_meta (
+ job_run_id BIGINT NOT NULL,
+ job_template_id BIGINT NOT NULL,
+ metalake_id BIGINT NOT NULL,
+ job_execution_id VARCHAR(256) NOT NULL,
+ job_run_status VARCHAR(64) NOT NULL,
+ job_finished_at BIGINT NOT NULL DEFAULT 0,
+ audit_info TEXT NOT NULL,
+ current_version INT NOT NULL DEFAULT 1,
+ last_version INT NOT NULL DEFAULT 1,
+ deleted_at BIGINT NOT NULL DEFAULT 0,
+ PRIMARY KEY (job_run_id),
+ UNIQUE (metalake_id, job_execution_id, deleted_at)
+);
+
+CREATE INDEX IF NOT EXISTS idx_job_template_id ON job_run_meta
(job_template_id);
+CREATE INDEX IF NOT EXISTS idx_job_execution_id ON job_run_meta
(job_execution_id);
+COMMENT ON TABLE job_run_meta IS 'job run metadata';
+COMMENT ON COLUMN job_run_meta.job_run_id IS 'job run id';
+COMMENT ON COLUMN job_run_meta.job_template_id IS 'job template id';
+COMMENT ON COLUMN job_run_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN job_run_meta.job_execution_id IS 'job execution id';
+COMMENT ON COLUMN job_run_meta.job_run_status IS 'job run status';
+COMMENT ON COLUMN job_run_meta.job_finished_at IS 'job run finished at';
+COMMENT ON COLUMN job_run_meta.audit_info IS 'job run audit info';
+COMMENT ON COLUMN job_run_meta.current_version IS 'job run current version';
+COMMENT ON COLUMN job_run_meta.last_version IS 'job run last version';
+COMMENT ON COLUMN job_run_meta.deleted_at IS 'job run deleted at';
diff --git a/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
b/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
index e96d1a82ae..a95ad69517 100644
--- a/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
+++ b/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
@@ -104,3 +104,59 @@ ALTER TABLE model_version_info DROP CONSTRAINT
model_version_info_model_id_versi
ALTER TABLE model_version_info ADD CONSTRAINT uk_mid_ver_uri_del UNIQUE
(model_id, version, model_version_uri_name, deleted_at);
-- remove the default value for model_version_uri_name
ALTER TABLE model_version_info ALTER COLUMN model_version_uri_name DROP
DEFAULT;
+
+
+CREATE TABLE IF NOT EXISTS job_template_meta (
+ job_template_id BIGINT NOT NULL,
+ job_template_name VARCHAR(128) NOT NULL,
+ metalake_id BIGINT NOT NULL,
+ job_template_comment TEXT DEFAULT NULL,
+ job_template_content TEXT NOT NULL,
+ audit_info TEXT NOT NULL,
+ current_version INT NOT NULL DEFAULT 1,
+ last_version INT NOT NULL DEFAULT 1,
+ deleted_at BIGINT NOT NULL DEFAULT 0,
+ PRIMARY KEY (job_template_id),
+ UNIQUE (metalake_id, job_template_name, deleted_at)
+);
+
+COMMENT ON TABLE job_template_meta IS 'job template metadata';
+COMMENT ON COLUMN job_template_meta.job_template_id IS 'job template id';
+COMMENT ON COLUMN job_template_meta.job_template_name IS 'job template name';
+COMMENT ON COLUMN job_template_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN job_template_meta.job_template_comment IS 'job template
comment';
+COMMENT ON COLUMN job_template_meta.job_template_content IS 'job template
content';
+COMMENT ON COLUMN job_template_meta.audit_info IS 'job template audit info';
+COMMENT ON COLUMN job_template_meta.current_version IS 'job template current
version';
+COMMENT ON COLUMN job_template_meta.last_version IS 'job template last
version';
+COMMENT ON COLUMN job_template_meta.deleted_at IS 'job template deleted at';
+
+
+CREATE TABLE IF NOT EXISTS job_run_meta (
+ job_run_id BIGINT NOT NULL,
+ job_template_id BIGINT NOT NULL,
+ metalake_id BIGINT NOT NULL,
+ job_execution_id VARCHAR(256) NOT NULL,
+ job_run_status VARCHAR(64) NOT NULL,
+ job_finished_at BIGINT NOT NULL DEFAULT 0,
+ audit_info TEXT NOT NULL,
+ current_version INT NOT NULL DEFAULT 1,
+ last_version INT NOT NULL DEFAULT 1,
+ deleted_at BIGINT NOT NULL DEFAULT 0,
+ PRIMARY KEY (job_run_id),
+ UNIQUE (metalake_id, job_execution_id, deleted_at)
+);
+
+CREATE INDEX IF NOT EXISTS idx_job_template_id ON job_run_meta
(job_template_id);
+CREATE INDEX IF NOT EXISTS idx_job_execution_id ON job_run_meta
(job_execution_id);
+COMMENT ON TABLE job_run_meta IS 'job run metadata';
+COMMENT ON COLUMN job_run_meta.job_run_id IS 'job run id';
+COMMENT ON COLUMN job_run_meta.job_template_id IS 'job template id';
+COMMENT ON COLUMN job_run_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN job_run_meta.job_execution_id IS 'job execution id';
+COMMENT ON COLUMN job_run_meta.job_run_status IS 'job run status';
+COMMENT ON COLUMN job_run_meta.job_finished_at IS 'job finished at';
+COMMENT ON COLUMN job_run_meta.audit_info IS 'job run audit info';
+COMMENT ON COLUMN job_run_meta.current_version IS 'job run current version';
+COMMENT ON COLUMN job_run_meta.last_version IS 'job run last version';
+COMMENT ON COLUMN job_run_meta.deleted_at IS 'job run deleted at';