This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new cda8017dd4 [#7507] feat(core): Add storage layout for job system 
(#7874)
cda8017dd4 is described below

commit cda8017dd4cb09908b801886d764061275e047f5
Author: Jerry Shao <[email protected]>
AuthorDate: Mon Aug 4 19:57:28 2025 +0800

    [#7507] feat(core): Add storage layout for job system (#7874)
    
    ### What changes were proposed in this pull request?
    
    This PR adds the storage layout for job system to store the job template
    and job to the Gravitino's store.
    
    ### Why are the changes needed?
    
    Fix: #7507
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UTs to cover the code.
---
 .../java/org/apache/gravitino/job/JobManager.java  |  31 ++-
 .../gravitino/job/local/LocalJobExecutor.java      |   5 +
 .../gravitino/storage/relational/JDBCBackend.java  |  37 ++-
 .../storage/relational/mapper/JobMetaMapper.java   |  70 ++++++
 .../mapper/JobMetaSQLProviderFactory.java          |  97 ++++++++
 .../relational/mapper/JobTemplateMetaMapper.java   |  68 ++++++
 .../mapper/JobTemplateMetaSQLProviderFactory.java  |  90 ++++++++
 .../provider/base/JobMetaBaseSQLProvider.java      | 176 ++++++++++++++
 .../base/JobTemplateMetaBaseSQLProvider.java       | 123 ++++++++++
 .../postgresql/JobMetaPostgreSQLProvider.java      | 107 +++++++++
 .../JobTemplateMetaPostgreSQLProvider.java         |  84 +++++++
 .../gravitino/storage/relational/po/JobPO.java     | 143 ++++++++++++
 .../storage/relational/po/JobTemplatePO.java       | 133 +++++++++++
 .../storage/relational/service/JobMetaService.java | 124 ++++++++++
 .../relational/service/JobTemplateMetaService.java | 133 +++++++++++
 .../relational/service/MetalakeMetaService.java    |  22 +-
 .../storage/relational/utils/POConverters.java     |   4 +-
 .../gravitino/storage/relational/po/TestJobPO.java | 144 ++++++++++++
 .../relational/service/TestJobMetaService.java     | 183 +++++++++++++++
 .../service/TestJobTemplateMetaService.java        | 257 +++++++++++++++++++++
 scripts/h2/schema-1.0.0-h2.sql                     |  31 +++
 scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql           |  33 ++-
 scripts/mysql/schema-1.0.0-mysql.sql               |  33 ++-
 scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql     |  31 +++
 scripts/postgresql/schema-1.0.0-postgresql.sql     |  56 +++++
 .../upgrade-0.9.0-to-1.0.0-postgresql.sql          |  56 +++++
 26 files changed, 2250 insertions(+), 21 deletions(-)

diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java 
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index 2b4120ec64..8bb193a7a3 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -335,10 +335,12 @@ public class JobManager implements 
JobOperationDispatcher, Closeable {
     // Retrieve the job entity, will throw NoSuchJobException if the job does 
not exist.
     JobEntity jobEntity = getJob(metalake, jobId);
 
-    if (jobEntity.status() == JobHandle.Status.CANCELLED
+    if (jobEntity.status() == JobHandle.Status.CANCELLING
+        || jobEntity.status() == JobHandle.Status.CANCELLED
         || jobEntity.status() == JobHandle.Status.SUCCEEDED
         || jobEntity.status() == JobHandle.Status.FAILED) {
-      // If the job is already cancelled, succeeded, or failed, we do not need 
to cancel it again.
+      // If the job is already cancelling, cancelled, succeeded, or failed, we 
do not need to cancel
+      // it again.
       return jobEntity;
     }
 
@@ -350,6 +352,8 @@ public class JobManager implements JobOperationDispatcher, 
Closeable {
           String.format("Failed to cancel job with ID %s under metalake %s", 
jobId, metalake), e);
     }
 
+    // TODO(jerry). Implement a background thread to monitor the job status 
and update it. Also,
+    //  we should delete the finished job entities after a certain period of 
time.
     // Update the job status to CANCELING
     JobEntity newJobEntity =
         JobEntity.builder()
@@ -366,15 +370,20 @@ public class JobManager implements 
JobOperationDispatcher, Closeable {
                     .withLastModifiedTime(Instant.now())
                     .build())
             .build();
-
-    try {
-      // Update the job entity in the entity store
-      entityStore.put(newJobEntity, true /* overwrite */);
-      return newJobEntity;
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Failed to update job entity %s to CANCELING status", 
newJobEntity), e);
-    }
+    return TreeLockUtils.doWithTreeLock(
+        NameIdentifierUtil.ofJob(metalake, jobId),
+        LockType.WRITE,
+        () -> {
+          try {
+            // Update the job entity in the entity store
+            entityStore.put(newJobEntity, true /* overwrite */);
+            return newJobEntity;
+          } catch (IOException e) {
+            throw new RuntimeException(
+                String.format("Failed to update job entity %s to CANCELING 
status", newJobEntity),
+                e);
+          }
+        });
   }
 
   @Override
diff --git 
a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java 
b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
index b8e41b851e..5d114a4605 100644
--- a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
+++ b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutor.java
@@ -198,6 +198,11 @@ public class LocalJobExecutor implements JobExecutor {
         return;
       }
 
+      if (statusPair.getLeft() == JobHandle.Status.CANCELLING) {
+        LOG.warn("Job {} is already being cancelled, no action taken", jobId);
+        return;
+      }
+
       // If the job is queued.
       if (statusPair.getLeft() == JobHandle.Status.QUEUED) {
         waitingQueue.removeIf(p -> p.getLeft().equals(jobId));
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 966b4e3250..b9440c9ae9 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -42,6 +42,8 @@ import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
 import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.ModelVersionEntity;
 import org.apache.gravitino.meta.PolicyEntity;
@@ -56,6 +58,8 @@ import 
org.apache.gravitino.storage.relational.database.H2Database;
 import org.apache.gravitino.storage.relational.service.CatalogMetaService;
 import org.apache.gravitino.storage.relational.service.FilesetMetaService;
 import org.apache.gravitino.storage.relational.service.GroupMetaService;
+import org.apache.gravitino.storage.relational.service.JobMetaService;
+import org.apache.gravitino.storage.relational.service.JobTemplateMetaService;
 import org.apache.gravitino.storage.relational.service.MetalakeMetaService;
 import org.apache.gravitino.storage.relational.service.ModelMetaService;
 import org.apache.gravitino.storage.relational.service.ModelVersionMetaService;
@@ -128,6 +132,11 @@ public class JDBCBackend implements RelationalBackend {
             
ModelVersionMetaService.getInstance().listModelVersionsByNamespace(namespace);
       case POLICY:
         return (List<E>) 
PolicyMetaService.getInstance().listPoliciesByNamespace(namespace);
+      case JOB_TEMPLATE:
+        return (List<E>)
+            
JobTemplateMetaService.getInstance().listJobTemplatesByNamespace(namespace);
+      case JOB:
+        return (List<E>) 
JobMetaService.getInstance().listJobsByNamespace(namespace);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for list operation", entityType);
@@ -178,6 +187,10 @@ public class JDBCBackend implements RelationalBackend {
       
ModelVersionMetaService.getInstance().insertModelVersion((ModelVersionEntity) 
e);
     } else if (e instanceof PolicyEntity) {
       PolicyMetaService.getInstance().insertPolicy((PolicyEntity) e, 
overwritten);
+    } else if (e instanceof JobTemplateEntity) {
+      
JobTemplateMetaService.getInstance().insertJobTemplate((JobTemplateEntity) e, 
overwritten);
+    } else if (e instanceof JobEntity) {
+      JobMetaService.getInstance().insertJob((JobEntity) e, overwritten);
     } else {
       throw new UnsupportedEntityTypeException(
           "Unsupported entity type: %s for insert operation", e.getClass());
@@ -252,6 +265,10 @@ public class JDBCBackend implements RelationalBackend {
         return (E) 
ModelVersionMetaService.getInstance().getModelVersionByIdentifier(ident);
       case POLICY:
         return (E) 
PolicyMetaService.getInstance().getPolicyByIdentifier(ident);
+      case JOB_TEMPLATE:
+        return (E) 
JobTemplateMetaService.getInstance().getJobTemplateByIdentifier(ident);
+      case JOB:
+        return (E) JobMetaService.getInstance().getJobByIdentifier(ident);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for get operation", entityType);
@@ -288,6 +305,8 @@ public class JDBCBackend implements RelationalBackend {
         return ModelVersionMetaService.getInstance().deleteModelVersion(ident);
       case POLICY:
         return PolicyMetaService.getInstance().deletePolicy(ident);
+      case JOB_TEMPLATE:
+        return JobTemplateMetaService.getInstance().deleteJobTemplate(ident);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for delete operation", entityType);
@@ -354,11 +373,12 @@ public class JDBCBackend implements RelationalBackend {
             .deleteModelVersionMetasByLegacyTimeline(
                 legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
       case JOB_TEMPLATE:
-        // TODO: Implement hard delete logic for job templates.
-        return 0;
+        return JobTemplateMetaService.getInstance()
+            .deleteJobTemplatesByLegacyTimeline(
+                legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
       case JOB:
-        // TODO: Implement hard delete logic for jobs.
-        return 0;
+        return JobMetaService.getInstance()
+            .deleteJobsByLegacyTimeline(legacyTimeline, 
GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
       case AUDIT:
         return 0;
         // TODO: Implement hard delete logic for these entity types.
@@ -478,6 +498,15 @@ public class JDBCBackend implements RelationalBackend {
           throw new IllegalArgumentException(
               String.format("ROLE_USER_REL doesn't support type %s", 
identType.name()));
         }
+
+      case JOB_TEMPLATE_JOB_REL:
+        if (identType == Entity.EntityType.JOB_TEMPLATE) {
+          return (List<E>) 
JobMetaService.getInstance().listJobsByTemplateIdent(nameIdentifier);
+        } else {
+          throw new IllegalArgumentException(
+              String.format("JOB_TEMPLATE_JOB_REL doesn't support type %s", 
identType.name()));
+        }
+
       default:
         throw new IllegalArgumentException(
             String.format("Doesn't support the relation type %s", relType));
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
new file mode 100644
index 0000000000..a8a45e3888
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.JobPO;
+import org.apache.ibatis.annotations.DeleteProvider;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
+
+public interface JobMetaMapper {
+  String TABLE_NAME = "job_run_meta";
+
+  @InsertProvider(type = JobMetaSQLProviderFactory.class, method = 
"insertJobMeta")
+  void insertJobMeta(@Param("jobMeta") JobPO jobPO);
+
+  @InsertProvider(
+      type = JobMetaSQLProviderFactory.class,
+      method = "insertJobMetaOnDuplicateKeyUpdate")
+  void insertJobMetaOnDuplicateKeyUpdate(@Param("jobMeta") JobPO jobPO);
+
+  @SelectProvider(type = JobMetaSQLProviderFactory.class, method = 
"listJobPOsByMetalake")
+  List<JobPO> listJobPOsByMetalake(@Param("metalakeName") String metalakeName);
+
+  @SelectProvider(
+      type = JobMetaSQLProviderFactory.class,
+      method = "listJobPOsByMetalakeAndTemplate")
+  List<JobPO> listJobPOsByMetalakeAndTemplate(
+      @Param("metalakeName") String metalakeName, @Param("jobTemplateName") 
String jobTemplateName);
+
+  @SelectProvider(type = JobMetaSQLProviderFactory.class, method = 
"selectJobPOByMetalakeAndRunId")
+  JobPO selectJobPOByMetalakeAndRunId(
+      @Param("metalakeName") String metalakeName, @Param("jobRunId") Long 
jobRunId);
+
+  @UpdateProvider(
+      type = JobMetaSQLProviderFactory.class,
+      method = "softDeleteJobMetaByMetalakeAndTemplate")
+  Integer softDeleteJobMetaByMetalakeAndTemplate(
+      @Param("metalakeName") String metalakeName, @Param("jobTemplateName") 
String jobTemplateName);
+
+  @UpdateProvider(type = JobMetaSQLProviderFactory.class, method = 
"softDeleteJobMetasByMetalakeId")
+  void softDeleteJobMetasByMetalakeId(@Param("metalakeId") Long metalakeId);
+
+  @UpdateProvider(
+      type = JobMetaSQLProviderFactory.class,
+      method = "softDeleteJobMetasByLegacyTimeline")
+  void softDeleteJobMetasByLegacyTimeline(@Param("legacyTimeline") Long 
legacyTimeline);
+
+  @DeleteProvider(type = JobMetaSQLProviderFactory.class, method = 
"deleteJobMetasByLegacyTimeline")
+  Integer deleteJobMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
new file mode 100644
index 0000000000..25a4c924f6
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.JobMetaBaseSQLProvider;
+import 
org.apache.gravitino.storage.relational.mapper.provider.postgresql.JobMetaPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.JobPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class JobMetaSQLProviderFactory {
+
+  private static final Map<JDBCBackend.JDBCBackendType, JobMetaBaseSQLProvider>
+      JOB_META_SQL_PROVIDER_MAP =
+          ImmutableMap.of(
+              JDBCBackend.JDBCBackendType.MYSQL, new JobMetaMySQLProvider(),
+              JDBCBackend.JDBCBackendType.H2, new JobMetaH2Provider(),
+              JDBCBackend.JDBCBackendType.POSTGRESQL, new 
JobMetaPostgreSQLProvider());
+
+  public static JobMetaBaseSQLProvider getProvider() {
+    String databaseId =
+        SqlSessionFactoryHelper.getInstance()
+            .getSqlSessionFactory()
+            .getConfiguration()
+            .getDatabaseId();
+
+    JDBCBackend.JDBCBackendType jdbcBackendType =
+        JDBCBackend.JDBCBackendType.fromString(databaseId);
+    return JOB_META_SQL_PROVIDER_MAP.get(jdbcBackendType);
+  }
+
+  static class JobMetaMySQLProvider extends JobMetaBaseSQLProvider {}
+
+  static class JobMetaH2Provider extends JobMetaBaseSQLProvider {}
+
+  public static String insertJobMeta(@Param("jobMeta") JobPO jobPO) {
+    return getProvider().insertJobMeta(jobPO);
+  }
+
+  public static String insertJobMetaOnDuplicateKeyUpdate(@Param("jobMeta") 
JobPO jobPO) {
+    return getProvider().insertJobMetaOnDuplicateKeyUpdate(jobPO);
+  }
+
+  public static String listJobPOsByMetalake(@Param("metalakeName") String 
metalakeName) {
+    return getProvider().listJobPOsByMetalake(metalakeName);
+  }
+
+  public static String listJobPOsByMetalakeAndTemplate(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateName") String jobTemplateName) {
+    return getProvider().listJobPOsByMetalakeAndTemplate(metalakeName, 
jobTemplateName);
+  }
+
+  public static String selectJobPOByMetalakeAndRunId(
+      @Param("metalakeName") String metalakeName, @Param("jobRunId") Long 
jobRunId) {
+    return getProvider().selectJobPOByMetalakeAndRunId(metalakeName, jobRunId);
+  }
+
+  public static String softDeleteJobMetaByMetalakeAndTemplate(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateName") String jobTemplateName) {
+    return getProvider().softDeleteJobMetaByMetalakeAndTemplate(metalakeName, 
jobTemplateName);
+  }
+
+  public static String softDeleteJobMetasByMetalakeId(@Param("metalakeId") 
Long metalakeId) {
+    return getProvider().softDeleteJobMetasByMetalakeId(metalakeId);
+  }
+
+  public static String deleteJobMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return getProvider().deleteJobMetasByLegacyTimeline(legacyTimeline, limit);
+  }
+
+  public static String softDeleteJobMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline) {
+    return getProvider().softDeleteJobMetasByLegacyTimeline(legacyTimeline);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaMapper.java
new file mode 100644
index 0000000000..c902880908
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaMapper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.JobTemplatePO;
+import org.apache.ibatis.annotations.DeleteProvider;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
+
+public interface JobTemplateMetaMapper {
+  String TABLE_NAME = "job_template_meta";
+
+  @InsertProvider(type = JobTemplateMetaSQLProviderFactory.class, method = 
"insertJobTemplateMeta")
+  void insertJobTemplateMeta(@Param("jobTemplateMeta") JobTemplatePO 
jobTemplatePO);
+
+  @InsertProvider(
+      type = JobTemplateMetaSQLProviderFactory.class,
+      method = "insertJobTemplateMetaOnDuplicateKeyUpdate")
+  void insertJobTemplateMetaOnDuplicateKeyUpdate(
+      @Param("jobTemplateMeta") JobTemplatePO jobTemplatePO);
+
+  @SelectProvider(
+      type = JobTemplateMetaSQLProviderFactory.class,
+      method = "listJobTemplatePOsByMetalake")
+  List<JobTemplatePO> listJobTemplatePOsByMetalake(@Param("metalakeName") 
String metalakeName);
+
+  @SelectProvider(
+      type = JobTemplateMetaSQLProviderFactory.class,
+      method = "selectJobTemplatePOByMetalakeAndName")
+  JobTemplatePO selectJobTemplatePOByMetalakeAndName(
+      @Param("metalakeName") String metalakeName, @Param("jobTemplateName") 
String jobTemplateName);
+
+  @UpdateProvider(
+      type = JobTemplateMetaSQLProviderFactory.class,
+      method = "softDeleteJobTemplateMetaByMetalakeAndName")
+  Integer softDeleteJobTemplateMetaByMetalakeAndName(
+      @Param("metalakeName") String metalakeName, @Param("jobTemplateName") 
String jobTemplateName);
+
+  @UpdateProvider(
+      type = JobTemplateMetaSQLProviderFactory.class,
+      method = "softDeleteJobTemplateMetasByMetalakeId")
+  void softDeleteJobTemplateMetasByMetalakeId(@Param("metalakeId") Long 
metalakeId);
+
+  @DeleteProvider(
+      type = JobTemplateMetaSQLProviderFactory.class,
+      method = "deleteJobTemplateMetasByLegacyTimeline")
+  Integer deleteJobTemplateMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaSQLProviderFactory.java
new file mode 100644
index 0000000000..64de88dc88
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobTemplateMetaSQLProviderFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.JobTemplateMetaBaseSQLProvider;
+import 
org.apache.gravitino.storage.relational.mapper.provider.postgresql.JobTemplateMetaPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.JobTemplatePO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class JobTemplateMetaSQLProviderFactory {
+
+  private static final Map<JDBCBackend.JDBCBackendType, 
JobTemplateMetaBaseSQLProvider>
+      JOB_TEMPLATE_META_SQL_PROVIDER_MAP =
+          ImmutableMap.of(
+              JDBCBackend.JDBCBackendType.MYSQL, new 
JobTemplateMetaMySQLProvider(),
+              JDBCBackend.JDBCBackendType.H2, new JobTemplateMetaH2Provider(),
+              JDBCBackend.JDBCBackendType.POSTGRESQL, new 
JobTemplateMetaPostgreSQLProvider());
+
+  public static JobTemplateMetaBaseSQLProvider getProvider() {
+    String databaseId =
+        SqlSessionFactoryHelper.getInstance()
+            .getSqlSessionFactory()
+            .getConfiguration()
+            .getDatabaseId();
+
+    JDBCBackend.JDBCBackendType jdbcBackendType =
+        JDBCBackend.JDBCBackendType.fromString(databaseId);
+    return JOB_TEMPLATE_META_SQL_PROVIDER_MAP.get(jdbcBackendType);
+  }
+
+  static class JobTemplateMetaMySQLProvider extends 
JobTemplateMetaBaseSQLProvider {}
+
+  static class JobTemplateMetaH2Provider extends 
JobTemplateMetaBaseSQLProvider {}
+
+  public static String insertJobTemplateMeta(
+      @Param("jobTemplateMeta") JobTemplatePO jobTemplatePO) {
+    return getProvider().insertJobTemplateMeta(jobTemplatePO);
+  }
+
+  public static String insertJobTemplateMetaOnDuplicateKeyUpdate(
+      @Param("jobTemplateMeta") JobTemplatePO jobTemplatePO) {
+    return 
getProvider().insertJobTemplateMetaOnDuplicateKeyUpdate(jobTemplatePO);
+  }
+
+  public static String listJobTemplatePOsByMetalake(@Param("metalakeName") 
String metalakeName) {
+    return getProvider().listJobTemplatePOsByMetalake(metalakeName);
+  }
+
+  public static String selectJobTemplatePOByMetalakeAndName(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateName") String jobTemplateName) {
+    return getProvider().selectJobTemplatePOByMetalakeAndName(metalakeName, 
jobTemplateName);
+  }
+
+  public static String softDeleteJobTemplateMetaByMetalakeAndName(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateName") String jobTemplateName) {
+    return 
getProvider().softDeleteJobTemplateMetaByMetalakeAndName(metalakeName, 
jobTemplateName);
+  }
+
+  public static String softDeleteJobTemplateMetasByMetalakeId(
+      @Param("metalakeId") Long metalakeId) {
+    return getProvider().softDeleteJobTemplateMetasByMetalakeId(metalakeId);
+  }
+
+  public static String deleteJobTemplateMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return 
getProvider().deleteJobTemplateMetasByLegacyTimeline(legacyTimeline, limit);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
new file mode 100644
index 0000000000..03abcfcdc5
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import org.apache.gravitino.storage.relational.mapper.JobMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.po.JobPO;
+import org.apache.ibatis.annotations.Param;
+
+public class JobMetaBaseSQLProvider {
+
+  public String insertJobMeta(@Param("jobMeta") JobPO jobPO) {
+    return "INSERT INTO "
+        + JobMetaMapper.TABLE_NAME
+        + " (job_run_id, job_template_id, metalake_id,"
+        + " job_execution_id, job_run_status, job_finished_at, audit_info, 
current_version,"
+        + " last_version, deleted_at)"
+        + " VALUES (#{jobMeta.jobRunId},"
+        + " (SELECT job_template_id FROM "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " WHERE job_template_name = #{jobMeta.jobTemplateName}"
+        + " AND metalake_id = #{jobMeta.metalakeId} AND deleted_at = 0),"
+        + " #{jobMeta.metalakeId}, #{jobMeta.jobExecutionId},"
+        + " #{jobMeta.jobRunStatus}, #{jobMeta.jobFinishedAt}, 
#{jobMeta.auditInfo},"
+        + " #{jobMeta.currentVersion}, #{jobMeta.lastVersion},"
+        + " #{jobMeta.deletedAt})";
+  }
+
+  public String insertJobMetaOnDuplicateKeyUpdate(@Param("jobMeta") JobPO 
jobPO) {
+    return "INSERT INTO "
+        + JobMetaMapper.TABLE_NAME
+        + " (job_run_id, job_template_id, metalake_id,"
+        + " job_execution_id, job_run_status, job_finished_at, audit_info, 
current_version,"
+        + " last_version, deleted_at)"
+        + " VALUES (#{jobMeta.jobRunId},"
+        + " (SELECT job_template_id FROM "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " WHERE job_template_name = #{jobMeta.jobTemplateName}"
+        + " AND metalake_id = #{jobMeta.metalakeId} AND deleted_at = 0),"
+        + " #{jobMeta.metalakeId}, #{jobMeta.jobExecutionId},"
+        + " #{jobMeta.jobRunStatus}, #{jobMeta.jobFinishedAt}, 
#{jobMeta.auditInfo},"
+        + " #{jobMeta.currentVersion}, #{jobMeta.lastVersion},"
+        + " #{jobMeta.deletedAt})"
+        + " ON DUPLICATE KEY UPDATE"
+        + " job_template_id = (SELECT job_template_id FROM "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " WHERE job_template_name = #{jobMeta.jobTemplateName}"
+        + " AND metalake_id = #{jobMeta.metalakeId} AND deleted_at = 0),"
+        + " metalake_id = #{jobMeta.metalakeId},"
+        + " job_execution_id = #{jobMeta.jobExecutionId},"
+        + " job_run_status = #{jobMeta.jobRunStatus},"
+        + " job_finished_at = #{jobMeta.jobFinishedAt},"
+        + " audit_info = #{jobMeta.auditInfo},"
+        + " current_version = #{jobMeta.currentVersion},"
+        + " last_version = #{jobMeta.lastVersion},"
+        + " deleted_at = #{jobMeta.deletedAt}";
+  }
+
+  public String listJobPOsByMetalake(@Param("metalakeName") String 
metalakeName) {
+    return "SELECT jrm.job_run_id AS jobRunId, jtm.job_template_name AS 
jobTemplateName,"
+        + " jrm.metalake_id AS metalakeId, jrm.job_execution_id AS 
jobExecutionId,"
+        + " jrm.job_run_status AS jobRunStatus, jrm.job_finished_at AS 
jobFinishedAt,"
+        + " jrm.audit_info AS auditInfo,"
+        + " jrm.current_version AS currentVersion, jrm.last_version AS 
lastVersion,"
+        + " jrm.deleted_at AS deletedAt"
+        + " FROM "
+        + JobMetaMapper.TABLE_NAME
+        + " jrm JOIN "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " jtm ON jrm.job_template_id = jtm.job_template_id"
+        + " JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON jrm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName} AND jrm.deleted_at = 0 
AND mm.deleted_at = 0"
+        + " AND jtm.deleted_at = 0";
+  }
+
+  public String listJobPOsByMetalakeAndTemplate(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateName") String jobTemplateName) {
+    return "SELECT jrm.job_run_id AS jobRunId, jtm.job_template_name AS 
jobTemplateName,"
+        + " jrm.metalake_id AS metalakeId, jrm.job_execution_id AS 
jobExecutionId,"
+        + " jrm.job_run_status AS jobRunStatus, jrm.job_finished_at AS 
jobFinishedAt, "
+        + " jrm.audit_info AS auditInfo,"
+        + " jrm.current_version AS currentVersion, jrm.last_version AS 
lastVersion,"
+        + " jrm.deleted_at AS deletedAt"
+        + " FROM "
+        + JobMetaMapper.TABLE_NAME
+        + " jrm JOIN "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " jtm ON jrm.job_template_id = jtm.job_template_id"
+        + " JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON jrm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName} AND jtm.job_template_name 
= #{jobTemplateName}"
+        + " AND jrm.deleted_at = 0 AND mm.deleted_at = 0 AND jtm.deleted_at = 
0";
+  }
+
+  public String selectJobPOByMetalakeAndRunId(
+      @Param("metalakeName") String metalakeName, @Param("jobRunId") Long 
jobRunId) {
+    return "SELECT jrm.job_run_id AS jobRunId, jtm.job_template_name AS 
jobTemplateName,"
+        + " jrm.metalake_id AS metalakeId, jrm.job_execution_id AS 
jobExecutionId,"
+        + " jrm.job_run_status AS jobRunStatus, jrm.job_finished_at AS 
jobFinishedAt,"
+        + " jrm.audit_info AS auditInfo,"
+        + " jrm.current_version AS currentVersion, jrm.last_version AS 
lastVersion,"
+        + " jrm.deleted_at AS deletedAt"
+        + " FROM "
+        + JobMetaMapper.TABLE_NAME
+        + " jrm JOIN "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " jtm ON jrm.job_template_id = jtm.job_template_id"
+        + " JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON jrm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName} AND jrm.job_run_id = 
#{jobRunId}"
+        + " AND jrm.deleted_at = 0 AND mm.deleted_at = 0 AND jtm.deleted_at = 
0";
+  }
+
+  public String softDeleteJobMetaByMetalakeAndTemplate(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateName") String jobTemplateName) {
+    return "UPDATE "
+        + JobMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000.0"
+        + " WHERE metalake_id = ("
+        + " SELECT metalake_id FROM "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " WHERE metalake_name = #{metalakeName} AND deleted_at = 0)"
+        + " AND job_template_id IN ("
+        + " SELECT job_template_id FROM "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " WHERE job_template_name = #{jobTemplateName} AND deleted_at = 0)"
+        + " AND deleted_at = 0";
+  }
+
+  public String softDeleteJobMetasByMetalakeId(@Param("metalakeId") Long 
metalakeId) {
+    return "UPDATE "
+        + JobMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000.0"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  public String softDeleteJobMetasByLegacyTimeline(@Param("legacyTimeline") 
Long legacyTimeline) {
+    return "UPDATE "
+        + JobMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000.0"
+        + " WHERE job_finished_at < #{legacyTimeline} AND job_finished_at > 0";
+  }
+
+  public String deleteJobMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + JobMetaMapper.TABLE_NAME
+        + " WHERE deleted_at < #{legacyTimeline} AND deleted_at > 0 LIMIT 
#{limit}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobTemplateMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobTemplateMetaBaseSQLProvider.java
new file mode 100644
index 0000000000..43111d3efd
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobTemplateMetaBaseSQLProvider.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.po.JobTemplatePO;
+import org.apache.ibatis.annotations.Param;
+
+public class JobTemplateMetaBaseSQLProvider {
+
+  public String insertJobTemplateMeta(@Param("jobTemplateMeta") JobTemplatePO 
jobTemplatePO) {
+    return "INSERT INTO "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " (job_template_id, job_template_name, metalake_id,"
+        + " job_template_comment, job_template_content, audit_info,"
+        + " current_version, last_version, deleted_at)"
+        + " VALUES (#{jobTemplateMeta.jobTemplateId}, 
#{jobTemplateMeta.jobTemplateName},"
+        + " #{jobTemplateMeta.metalakeId}, 
#{jobTemplateMeta.jobTemplateComment},"
+        + " #{jobTemplateMeta.jobTemplateContent}, 
#{jobTemplateMeta.auditInfo},"
+        + " #{jobTemplateMeta.currentVersion}, #{jobTemplateMeta.lastVersion},"
+        + " #{jobTemplateMeta.deletedAt})";
+  }
+
+  public String insertJobTemplateMetaOnDuplicateKeyUpdate(
+      @Param("jobTemplateMeta") JobTemplatePO jobTemplatePO) {
+    return "INSERT INTO "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " (job_template_id, job_template_name, metalake_id,"
+        + " job_template_comment, job_template_content, audit_info, 
current_version,"
+        + " last_version, deleted_at)"
+        + " VALUES (#{jobTemplateMeta.jobTemplateId}, 
#{jobTemplateMeta.jobTemplateName},"
+        + " #{jobTemplateMeta.metalakeId}, 
#{jobTemplateMeta.jobTemplateComment},"
+        + " #{jobTemplateMeta.jobTemplateContent}, 
#{jobTemplateMeta.auditInfo},"
+        + " #{jobTemplateMeta.currentVersion}, #{jobTemplateMeta.lastVersion},"
+        + " #{jobTemplateMeta.deletedAt})"
+        + " ON DUPLICATE KEY UPDATE"
+        + " job_template_name = #{jobTemplateMeta.jobTemplateName},"
+        + " metalake_id = #{jobTemplateMeta.metalakeId},"
+        + " job_template_comment = #{jobTemplateMeta.jobTemplateComment},"
+        + " job_template_content = #{jobTemplateMeta.jobTemplateContent},"
+        + " audit_info = #{jobTemplateMeta.auditInfo},"
+        + " current_version = #{jobTemplateMeta.currentVersion},"
+        + " last_version = #{jobTemplateMeta.lastVersion},"
+        + " deleted_at = #{jobTemplateMeta.deletedAt}";
+  }
+
+  public String listJobTemplatePOsByMetalake(@Param("metalakeName") String 
metalakeName) {
+    return "SELECT jtm.job_template_id AS jobTemplateId, jtm.job_template_name 
AS jobTemplateName,"
+        + " jtm.metalake_id AS metalakeId, jtm.job_template_comment AS 
jobTemplateComment,"
+        + " jtm.job_template_content AS jobTemplateContent, jtm.audit_info AS 
auditInfo,"
+        + " jtm.current_version AS currentVersion, jtm.last_version AS 
lastVersion,"
+        + " jtm.deleted_at AS deletedAt"
+        + " FROM "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " jtm JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON jtm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName} AND jtm.deleted_at = 0 
AND mm.deleted_at = 0";
+  }
+
+  public String selectJobTemplatePOByMetalakeAndName(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateName") String jobTemplateName) {
+    return "SELECT jtm.job_template_id AS jobTemplateId, jtm.job_template_name 
AS jobTemplateName,"
+        + " jtm.metalake_id AS metalakeId, jtm.job_template_comment AS 
jobTemplateComment,"
+        + " jtm.job_template_content AS jobTemplateContent, jtm.audit_info AS 
auditInfo,"
+        + " jtm.current_version AS currentVersion, jtm.last_version AS 
lastVersion,"
+        + " jtm.deleted_at AS deletedAt"
+        + " FROM "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " jtm JOIN "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " mm ON jtm.metalake_id = mm.metalake_id"
+        + " WHERE mm.metalake_name = #{metalakeName} AND jtm.job_template_name 
= #{jobTemplateName}"
+        + " AND jtm.deleted_at = 0 AND mm.deleted_at = 0";
+  }
+
+  public String softDeleteJobTemplateMetaByMetalakeAndName(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateName") String jobTemplateName) {
+    return "UPDATE "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000.0"
+        + " WHERE job_template_name = #{jobTemplateName} AND metalake_id ="
+        + " (SELECT metalake_id FROM "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " WHERE metalake_name = #{metalakeName} AND deleted_at = 0)"
+        + " AND deleted_at = 0";
+  }
+
+  public String softDeleteJobTemplateMetasByMetalakeId(@Param("metalakeId") 
Long metalakeId) {
+    return "UPDATE "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000.0"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  public String deleteJobTemplateMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " WHERE deleted_at < #{legacyTimeline} AND deleted_at > 0 LIMIT 
#{limit}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
new file mode 100644
index 0000000000..56e667d274
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobMetaPostgreSQLProvider.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import org.apache.gravitino.storage.relational.mapper.JobMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.JobMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.JobPO;
+import org.apache.ibatis.annotations.Param;
+
+public class JobMetaPostgreSQLProvider extends JobMetaBaseSQLProvider {
+
+  @Override
+  public String insertJobMetaOnDuplicateKeyUpdate(@Param("jobMeta") JobPO 
jobPO) {
+    return "INSERT INTO "
+        + JobMetaMapper.TABLE_NAME
+        + " (job_run_id, job_template_id, metalake_id,"
+        + " job_execution_id, job_run_status, job_finished_at, audit_info, 
current_version,"
+        + " last_version, deleted_at)"
+        + " VALUES (#{jobMeta.jobRunId},"
+        + " (SELECT job_template_id FROM "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " WHERE job_template_name = #{jobMeta.jobTemplateName}"
+        + " AND metalake_id = #{jobMeta.metalakeId} AND deleted_at = 0),"
+        + " #{jobMeta.metalakeId}, #{jobMeta.jobExecutionId},"
+        + " #{jobMeta.jobRunStatus}, #{jobMeta.jobFinished}, 
#{jobMeta.auditInfo},"
+        + " #{jobMeta.currentVersion}, #{jobMeta.lastVersion},"
+        + " #{jobMeta.deletedAt})"
+        + " ON CONFLICT (job_run_id) DO UPDATE SET"
+        + " job_template_id = (SELECT job_template_id FROM "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " WHERE job_template_name = #{jobMeta.jobTemplateName}"
+        + " AND metalake_id = #{jobMeta.metalakeId} AND deleted_at = 0),"
+        + " metalake_id = #{jobMeta.metalakeId},"
+        + " job_execution_id = #{jobMeta.jobExecutionId},"
+        + " job_run_status = #{jobMeta.jobRunStatus},"
+        + " job_finished_at = #{jobMeta.jobFinishedAt},"
+        + " audit_info = #{jobMeta.auditInfo},"
+        + " current_version = #{jobMeta.currentVersion},"
+        + " last_version = #{jobMeta.lastVersion},"
+        + " deleted_at = #{jobMeta.deletedAt}";
+  }
+
+  @Override
+  public String softDeleteJobMetaByMetalakeAndTemplate(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateName") String jobTemplateName) {
+    return "UPDATE "
+        + JobMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE metalake_id IN ("
+        + " SELECT metalake_id FROM "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " WHERE metalake_name = #{metalakeName} AND deleted_at = 0)"
+        + " AND job_template_id IN ("
+        + " SELECT job_template_id FROM "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " WHERE job_template_name = #{jobTemplateName} AND deleted_at = 0)"
+        + " AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteJobMetasByMetalakeId(@Param("metalakeId") Long 
metalakeId) {
+    return "UPDATE "
+        + JobMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteJobMetasByLegacyTimeline(@Param("legacyTimeline") 
Long legacyTimeline) {
+    return "UPDATE "
+        + JobMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE job_finished_at < #{legacyTimeline} AND job_finished_at > 0";
+  }
+
+  @Override
+  public String deleteJobMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + JobMetaMapper.TABLE_NAME
+        + " WHERE job_run_id IN (SELECT job_run_id FROM "
+        + JobMetaMapper.TABLE_NAME
+        + " WHERE deleted_at < #{legacyTimeline} AND deleted_at > 0 LIMIT 
#{limit})";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobTemplateMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobTemplateMetaPostgreSQLProvider.java
new file mode 100644
index 0000000000..e0c9557319
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/JobTemplateMetaPostgreSQLProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.JobTemplateMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.JobTemplatePO;
+import org.apache.ibatis.annotations.Param;
+
+public class JobTemplateMetaPostgreSQLProvider extends 
JobTemplateMetaBaseSQLProvider {
+
+  @Override
+  public String softDeleteJobTemplateMetaByMetalakeAndName(
+      @Param("metalakeName") String metalakeName,
+      @Param("jobTemplateName") String jobTemplateName) {
+    return "UPDATE "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE metalake_id IN ("
+        + " SELECT metalake_id FROM "
+        + MetalakeMetaMapper.TABLE_NAME
+        + " WHERE metalake_name = #{metalakeName} AND deleted_at = 0)"
+        + " AND job_template_name = #{jobTemplateName} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteJobTemplateMetasByMetalakeId(@Param("metalakeId") 
Long metalakeId) {
+    return "UPDATE "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000))) "
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String insertJobTemplateMetaOnDuplicateKeyUpdate(
+      @Param("jobTemplateMeta") JobTemplatePO jobTemplatePO) {
+    return "INSERT INTO "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " (job_template_id, job_template_name, metalake_id, 
job_template_comment, "
+        + " job_template_content, audit_info, current_version, last_version, 
deleted_at)"
+        + " VALUES (#{jobTemplateMeta.jobTemplateId}, 
#{jobTemplateMeta.jobTemplateName},"
+        + " #{jobTemplateMeta.metalakeId}, 
#{jobTemplateMeta.jobTemplateComment},"
+        + " #{jobTemplateMeta.jobTemplateContent}, 
#{jobTemplateMeta.auditInfo},"
+        + " #{jobTemplateMeta.currentVersion}, #{jobTemplateMeta.lastVersion},"
+        + " #{jobTemplateMeta.deletedAt})"
+        + " ON CONFLICT(job_template_id) DO UPDATE SET"
+        + " job_template_name = #{jobTemplateMeta.jobTemplateName},"
+        + " metalake_id = #{jobTemplateMeta.metalakeId},"
+        + " job_template_comment = #{jobTemplateMeta.jobTemplateComment},"
+        + " job_template_content = #{jobTemplateMeta.jobTemplateContent},"
+        + " audit_info = #{jobTemplateMeta.auditInfo},"
+        + " current_version = #{jobTemplateMeta.currentVersion},"
+        + " last_version = #{jobTemplateMeta.lastVersion},"
+        + " deleted_at = #{jobTemplateMeta.deletedAt}";
+  }
+
+  @Override
+  public String deleteJobTemplateMetasByLegacyTimeline(Long legacyTimeline, 
int limit) {
+    return "DELETE FROM "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " WHERE job_template_id IN (SELECT job_template_id FROM "
+        + JobTemplateMetaMapper.TABLE_NAME
+        + " WHERE deleted_at < #{legacyTimeline} AND deleted_at > 0 LIMIT 
#{limit})";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/JobPO.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/JobPO.java
new file mode 100644
index 0000000000..236a1eb6c3
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/storage/relational/po/JobPO.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import static 
org.apache.gravitino.storage.relational.utils.POConverters.DEFAULT_DELETED_AT;
+import static 
org.apache.gravitino.storage.relational.utils.POConverters.INIT_VERSION;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobEntity;
+
+@Getter
+@Accessors(fluent = true)
+@EqualsAndHashCode
+@ToString
+public class JobPO {
+
+  private Long jobRunId;
+  private String jobTemplateName;
+  private Long metalakeId;
+  private String jobExecutionId;
+  private String jobRunStatus;
+  private Long jobFinishedAt;
+  private String auditInfo;
+  private Long currentVersion;
+  private Long lastVersion;
+  private Long deletedAt;
+
+  public JobPO() {
+    // Default constructor for JPA
+  }
+
+  @lombok.Builder(setterPrefix = "with")
+  private JobPO(
+      Long jobRunId,
+      String jobTemplateName,
+      Long metalakeId,
+      String jobExecutionId,
+      String jobRunStatus,
+      Long jobFinishedAt,
+      String auditInfo,
+      Long currentVersion,
+      Long lastVersion,
+      Long deletedAt) {
+    Preconditions.checkArgument(jobRunId != null, "jobRunId cannot be null");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(jobTemplateName), "jobTemplateName cannot be 
blank");
+    Preconditions.checkArgument(metalakeId != null, "metalakeId cannot be 
null");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(jobExecutionId), "jobExecutionId cannot be 
blank");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(jobRunStatus), "jobRunStatus cannot be blank");
+    Preconditions.checkArgument(jobFinishedAt != null, "jobFinishedAt cannot 
be null");
+    Preconditions.checkArgument(StringUtils.isNotBlank(auditInfo), "auditInfo 
cannot be blank");
+    Preconditions.checkArgument(currentVersion != null, "currentVersion cannot 
be null");
+    Preconditions.checkArgument(lastVersion != null, "lastVersion cannot be 
null");
+    Preconditions.checkArgument(deletedAt != null, "deletedAt cannot be null");
+
+    this.jobRunId = jobRunId;
+    this.jobTemplateName = jobTemplateName;
+    this.metalakeId = metalakeId;
+    this.jobExecutionId = jobExecutionId;
+    this.jobRunStatus = jobRunStatus;
+    this.jobFinishedAt = jobFinishedAt;
+    this.auditInfo = auditInfo;
+    this.currentVersion = currentVersion;
+    this.lastVersion = lastVersion;
+    this.deletedAt = deletedAt;
+  }
+
+  public static class JobPOBuilder {
+    // Builder class for JobPO
+    // Lombok will generate the builder methods based on the fields defined in 
JobPO
+  }
+
+  public static JobPO initializeJobPO(JobEntity jobEntity, JobPOBuilder 
builder) {
+    // We should not keep the terminated job entities in the database forever, 
so we set the
+    // current time as the finished timestamp if the job is in a terminal 
state,
+    // So the entity GC cleaner will clean it up later.
+    long finished = DEFAULT_DELETED_AT;
+    if (jobEntity.status() == JobHandle.Status.CANCELLED
+        || jobEntity.status() == JobHandle.Status.FAILED
+        || jobEntity.status() == JobHandle.Status.SUCCEEDED) {
+      finished = System.currentTimeMillis();
+    }
+
+    try {
+      return builder
+          .withJobRunId(jobEntity.id())
+          .withJobTemplateName(jobEntity.jobTemplateName())
+          .withJobExecutionId(jobEntity.jobExecutionId())
+          .withJobRunStatus(jobEntity.status().name())
+          .withJobFinishedAt(finished)
+          
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(jobEntity.auditInfo()))
+          .withCurrentVersion(INIT_VERSION)
+          .withLastVersion(INIT_VERSION)
+          .withDeletedAt(DEFAULT_DELETED_AT)
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize job entity", e);
+    }
+  }
+
+  public static JobEntity fromJobPO(JobPO jobPO, Namespace namespace) {
+    try {
+      return JobEntity.builder()
+          .withId(jobPO.jobRunId)
+          .withJobExecutionId(jobPO.jobExecutionId)
+          .withNamespace(namespace)
+          .withStatus(JobHandle.Status.valueOf(jobPO.jobRunStatus))
+          .withJobTemplateName(jobPO.jobTemplateName)
+          .withAuditInfo(JsonUtils.anyFieldMapper().readValue(jobPO.auditInfo, 
AuditInfo.class))
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to deserialize job PO", e);
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/JobTemplatePO.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/JobTemplatePO.java
new file mode 100644
index 0000000000..6efd8fb141
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/JobTemplatePO.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import static 
org.apache.gravitino.storage.relational.utils.POConverters.DEFAULT_DELETED_AT;
+import static 
org.apache.gravitino.storage.relational.utils.POConverters.INIT_VERSION;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobTemplateEntity;
+
+@Getter
+@Accessors(fluent = true)
+@EqualsAndHashCode
+@ToString
+public class JobTemplatePO {
+
+  private Long jobTemplateId;
+  private String jobTemplateName;
+  private Long metalakeId;
+  private String jobTemplateComment;
+  private String jobTemplateContent;
+  private String auditInfo;
+  private Long currentVersion;
+  private Long lastVersion;
+  private Long deletedAt;
+
+  public JobTemplatePO() {
+    // Default constructor for JPA
+  }
+
+  @lombok.Builder(setterPrefix = "with")
+  private JobTemplatePO(
+      Long jobTemplateId,
+      String jobTemplateName,
+      Long metalakeId,
+      String jobTemplateComment,
+      String jobTemplateContent,
+      String auditInfo,
+      Long currentVersion,
+      Long lastVersion,
+      Long deletedAt) {
+    Preconditions.checkArgument(jobTemplateId != null, "jobTemplateId cannot 
be null");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(jobTemplateName), "jobTemplateName cannot be 
blank");
+    Preconditions.checkArgument(metalakeId != null, "metalakeId cannot be 
null");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(jobTemplateContent), "jobTemplateContent cannot 
be blank");
+    Preconditions.checkArgument(StringUtils.isNotBlank(auditInfo), "auditInfo 
cannot be blank");
+    Preconditions.checkArgument(currentVersion != null, "currentVersion cannot 
be null");
+    Preconditions.checkArgument(lastVersion != null, "lastVersion cannot be 
null");
+    Preconditions.checkArgument(deletedAt != null, "deletedAt cannot be null");
+
+    this.jobTemplateId = jobTemplateId;
+    this.jobTemplateName = jobTemplateName;
+    this.metalakeId = metalakeId;
+    this.jobTemplateComment = jobTemplateComment;
+    this.jobTemplateContent = jobTemplateContent;
+    this.auditInfo = auditInfo;
+    this.currentVersion = currentVersion;
+    this.lastVersion = lastVersion;
+    this.deletedAt = deletedAt;
+  }
+
+  public static class JobTemplatePOBuilder {
+    // Builder class for JobTemplatePO
+    // Lombok will generate the builder methods based on the fields defined in 
JobTemplatePO
+  }
+
+  public static JobTemplatePO initializeJobTemplatePO(
+      JobTemplateEntity jobTemplateEntity, JobTemplatePOBuilder builder) {
+    try {
+      return builder
+          .withJobTemplateId(jobTemplateEntity.id())
+          .withJobTemplateName(jobTemplateEntity.name())
+          .withJobTemplateComment(jobTemplateEntity.comment())
+          .withJobTemplateContent(
+              
JsonUtils.anyFieldMapper().writeValueAsString(jobTemplateEntity.templateContent()))
+          .withAuditInfo(
+              
JsonUtils.anyFieldMapper().writeValueAsString(jobTemplateEntity.auditInfo()))
+          .withCurrentVersion(INIT_VERSION)
+          .withLastVersion(INIT_VERSION)
+          .withDeletedAt(DEFAULT_DELETED_AT)
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize job template entity", e);
+    }
+  }
+
+  public static JobTemplateEntity fromJobTemplatePO(
+      JobTemplatePO jobTemplatePO, Namespace namespace) {
+    try {
+      return JobTemplateEntity.builder()
+          .withId(jobTemplatePO.jobTemplateId())
+          .withName(jobTemplatePO.jobTemplateName())
+          .withNamespace(namespace)
+          .withComment(jobTemplatePO.jobTemplateComment())
+          .withTemplateContent(
+              JsonUtils.anyFieldMapper()
+                  .readValue(
+                      jobTemplatePO.jobTemplateContent(), 
JobTemplateEntity.TemplateContent.class))
+          .withAuditInfo(
+              JsonUtils.anyFieldMapper().readValue(jobTemplatePO.auditInfo(), 
AuditInfo.class))
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to deserialize job template PO", e);
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
new file mode 100644
index 0000000000..ebc9c70760
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobMetaService.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.storage.relational.mapper.JobMetaMapper;
+import org.apache.gravitino.storage.relational.po.JobPO;
+import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.apache.gravitino.utils.NamespaceUtil;
+
+public class JobMetaService {
+
+  private static final JobMetaService INSTANCE = new JobMetaService();
+
+  private JobMetaService() {
+    // Private constructor to prevent instantiation
+  }
+
+  public static JobMetaService getInstance() {
+    return INSTANCE;
+  }
+
+  public List<JobEntity> listJobsByNamespace(Namespace ns) {
+    String metalakeName = ns.level(0);
+    List<JobPO> jobPOs =
+        SessionUtils.getWithoutCommit(
+            JobMetaMapper.class, mapper -> 
mapper.listJobPOsByMetalake(metalakeName));
+    return jobPOs.stream().map(po -> JobPO.fromJobPO(po, 
ns)).collect(Collectors.toList());
+  }
+
+  public List<JobEntity> listJobsByTemplateIdent(NameIdentifier 
jobTemplateIdent) {
+    String metalakeName = jobTemplateIdent.namespace().level(0);
+    String jobTemplateName = jobTemplateIdent.name();
+    List<JobPO> jobPOs =
+        SessionUtils.getWithoutCommit(
+            JobMetaMapper.class,
+            mapper -> mapper.listJobPOsByMetalakeAndTemplate(metalakeName, 
jobTemplateName));
+    return jobPOs.stream()
+        .map(po -> JobPO.fromJobPO(po, NamespaceUtil.ofJob(metalakeName)))
+        .collect(Collectors.toList());
+  }
+
+  public JobEntity getJobByIdentifier(NameIdentifier ident) {
+    String metalakeName = ident.namespace().level(0);
+    String jobRunId = ident.name();
+    Long jobRunIdLong;
+    try {
+      jobRunIdLong = 
Long.parseLong(jobRunId.substring(JobHandle.JOB_ID_PREFIX.length()));
+    } catch (NumberFormatException e) {
+      throw new NoSuchEntityException("Invalid job run ID format %s", 
jobRunId);
+    }
+
+    JobPO jobPO =
+        SessionUtils.getWithoutCommit(
+            JobMetaMapper.class,
+            mapper -> mapper.selectJobPOByMetalakeAndRunId(metalakeName, 
jobRunIdLong));
+    if (jobPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.JOB.name().toLowerCase(Locale.ROOT),
+          jobRunId);
+    }
+    return JobPO.fromJobPO(jobPO, ident.namespace());
+  }
+
+  public void insertJob(JobEntity jobEntity, boolean overwrite) throws 
IOException {
+    String metalakeName = jobEntity.namespace().level(0);
+
+    try {
+      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
+
+      JobPO.JobPOBuilder builder = JobPO.builder().withMetalakeId(metalakeId);
+      JobPO jobPO = JobPO.initializeJobPO(jobEntity, builder);
+
+      SessionUtils.doWithCommit(
+          JobMetaMapper.class,
+          mapper -> {
+            if (overwrite) {
+              mapper.insertJobMetaOnDuplicateKeyUpdate(jobPO);
+            } else {
+              mapper.insertJobMeta(jobPO);
+            }
+          });
+    } catch (RuntimeException e) {
+      ExceptionUtils.checkSQLException(e, Entity.EntityType.JOB, 
jobEntity.id().toString());
+    }
+  }
+
+  public int deleteJobsByLegacyTimeline(long legacyTimeline, int limit) {
+    // Mark jobs as deleted for finished jobs, so that they can be cleaned up 
later
+    SessionUtils.doWithCommit(
+        JobMetaMapper.class, mapper -> 
mapper.softDeleteJobMetasByLegacyTimeline(legacyTimeline));
+
+    return SessionUtils.doWithCommitAndFetchResult(
+        JobMetaMapper.class,
+        mapper -> mapper.deleteJobMetasByLegacyTimeline(legacyTimeline, 
limit));
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/JobTemplateMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobTemplateMetaService.java
new file mode 100644
index 0000000000..c8c61cb51f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/JobTemplateMetaService.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.storage.relational.mapper.JobMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
+import org.apache.gravitino.storage.relational.po.JobTemplatePO;
+import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+
+public class JobTemplateMetaService {
+
+  private static final JobTemplateMetaService INSTANCE = new 
JobTemplateMetaService();
+
+  private JobTemplateMetaService() {
+    // Private constructor to prevent instantiation
+  }
+
+  public static JobTemplateMetaService getInstance() {
+    return INSTANCE;
+  }
+
+  public List<JobTemplateEntity> listJobTemplatesByNamespace(Namespace ns) {
+    String metalakeName = ns.level(0);
+    List<JobTemplatePO> jobTemplatePOs =
+        SessionUtils.getWithoutCommit(
+            JobTemplateMetaMapper.class,
+            mapper -> mapper.listJobTemplatePOsByMetalake(metalakeName));
+
+    return jobTemplatePOs.stream()
+        .map(p -> JobTemplatePO.fromJobTemplatePO(p, ns))
+        .collect(Collectors.toList());
+  }
+
+  public JobTemplateEntity getJobTemplateByIdentifier(NameIdentifier 
jobTemplateIdent) {
+    String metalakeName = jobTemplateIdent.namespace().level(0);
+    String jobTemplateName = jobTemplateIdent.name();
+
+    JobTemplatePO jobTemplatePO =
+        SessionUtils.getWithoutCommit(
+            JobTemplateMetaMapper.class,
+            mapper -> 
mapper.selectJobTemplatePOByMetalakeAndName(metalakeName, jobTemplateName));
+
+    if (jobTemplatePO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.JOB_TEMPLATE.name().toLowerCase(Locale.ROOT),
+          jobTemplateName);
+    }
+
+    return JobTemplatePO.fromJobTemplatePO(jobTemplatePO, 
jobTemplateIdent.namespace());
+  }
+
+  public void insertJobTemplate(JobTemplateEntity jobTemplateEntity, boolean 
overwrite)
+      throws IOException {
+    String metalakeName = jobTemplateEntity.namespace().level(0);
+
+    try {
+      Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
+
+      JobTemplatePO.JobTemplatePOBuilder builder =
+          JobTemplatePO.builder().withMetalakeId(metalakeId);
+      JobTemplatePO jobTemplatePO =
+          JobTemplatePO.initializeJobTemplatePO(jobTemplateEntity, builder);
+
+      SessionUtils.doWithCommit(
+          JobTemplateMetaMapper.class,
+          mapper -> {
+            if (overwrite) {
+              mapper.insertJobTemplateMetaOnDuplicateKeyUpdate(jobTemplatePO);
+            } else {
+              mapper.insertJobTemplateMeta(jobTemplatePO);
+            }
+          });
+    } catch (RuntimeException e) {
+      ExceptionUtils.checkSQLException(e, Entity.EntityType.JOB_TEMPLATE, 
jobTemplateEntity.name());
+      throw e;
+    }
+  }
+
+  public boolean deleteJobTemplate(NameIdentifier jobTemplateIdent) {
+    String metalakeName = jobTemplateIdent.namespace().level(0);
+    String jobTemplateName = jobTemplateIdent.name();
+
+    AtomicInteger result = new AtomicInteger(0);
+    SessionUtils.doMultipleWithCommit(
+        () ->
+            SessionUtils.doWithoutCommit(
+                JobMetaMapper.class,
+                mapper ->
+                    
mapper.softDeleteJobMetaByMetalakeAndTemplate(metalakeName, jobTemplateName)),
+        () ->
+            result.set(
+                SessionUtils.doWithoutCommitAndFetchResult(
+                    JobTemplateMetaMapper.class,
+                    mapper ->
+                        mapper.softDeleteJobTemplateMetaByMetalakeAndName(
+                            metalakeName, jobTemplateName))));
+    return result.get() > 0;
+  }
+
+  public int deleteJobTemplatesByLegacyTimeline(long legacyTimeline, int 
limit) {
+    return SessionUtils.doWithCommitAndFetchResult(
+        JobTemplateMetaMapper.class,
+        mapper -> 
mapper.deleteJobTemplateMetasByLegacyTimeline(legacyTimeline, limit));
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
index ac7b32306b..6c2b6b7c57 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
@@ -36,6 +36,8 @@ import 
org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupRoleRelMapper;
+import org.apache.gravitino.storage.relational.mapper.JobMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.JobTemplateMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import 
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
@@ -268,7 +270,15 @@ public class MetalakeMetaService {
             () ->
                 SessionUtils.doWithoutCommit(
                     ModelMetaMapper.class,
-                    mapper -> 
mapper.softDeleteModelMetasByMetalakeId(metalakeId)));
+                    mapper -> 
mapper.softDeleteModelMetasByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    JobTemplateMetaMapper.class,
+                    mapper -> 
mapper.softDeleteJobTemplateMetasByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    JobMetaMapper.class,
+                    mapper -> 
mapper.softDeleteJobMetasByMetalakeId(metalakeId)));
       } else {
         List<CatalogEntity> catalogEntities =
             CatalogMetaService.getInstance()
@@ -317,7 +327,15 @@ public class MetalakeMetaService {
             () ->
                 SessionUtils.doWithoutCommit(
                     OwnerMetaMapper.class,
-                    mapper -> 
mapper.softDeleteOwnerRelByMetalakeId(metalakeId)));
+                    mapper -> 
mapper.softDeleteOwnerRelByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    JobTemplateMetaMapper.class,
+                    mapper -> 
mapper.softDeleteJobTemplateMetasByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    JobMetaMapper.class,
+                    mapper -> 
mapper.softDeleteJobMetasByMetalakeId(metalakeId)));
       }
     }
     return true;
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index b295ddf110..7d00cbd947 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -93,8 +93,8 @@ import org.apache.gravitino.utils.PrincipalUtils;
 
 /** POConverters is a utility class to convert PO to Base and vice versa. */
 public class POConverters {
-  private static final long INIT_VERSION = 1L;
-  private static final long DEFAULT_DELETED_AT = 0L;
+  public static final long INIT_VERSION = 1L;
+  public static final long DEFAULT_DELETED_AT = 0L;
 
   private POConverters() {}
 
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/po/TestJobPO.java 
b/core/src/test/java/org/apache/gravitino/storage/relational/po/TestJobPO.java
new file mode 100644
index 0000000000..3fd8f9988b
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/po/TestJobPO.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.time.Instant;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.ShellJobTemplate;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestJobPO {
+
+  @Test
+  public void testJobTemplatePO() {
+    JobTemplate shellJobTemplate =
+        ShellJobTemplate.builder()
+            .withName("shell-job-template")
+            .withComment("This is a shell job template")
+            .withExecutable("/bin/echo")
+            .withArguments(Lists.newArrayList("Hello, World!"))
+            .withEnvironments(ImmutableMap.of("ENV_VAR", "value"))
+            .withScripts(Lists.newArrayList("/path/to/script.sh"))
+            .build();
+
+    JobTemplateEntity shellTemplateEntity =
+        JobTemplateEntity.builder()
+            .withName(shellJobTemplate.name())
+            .withTemplateContent(
+                
JobTemplateEntity.TemplateContent.fromJobTemplate(shellJobTemplate))
+            .withComment(shellJobTemplate.comment())
+            .withNamespace(NamespaceUtil.ofJobTemplate("test"))
+            .withId(1L)
+            .withAuditInfo(
+                
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    JobTemplatePO.JobTemplatePOBuilder builder = 
JobTemplatePO.builder().withMetalakeId(1L);
+
+    JobTemplatePO jobTemplatePO =
+        JobTemplatePO.initializeJobTemplatePO(shellTemplateEntity, builder);
+    JobTemplateEntity resultEntity =
+        JobTemplatePO.fromJobTemplatePO(jobTemplatePO, 
NamespaceUtil.ofJobTemplate("test"));
+
+    Assertions.assertEquals(shellTemplateEntity.name(), resultEntity.name());
+    Assertions.assertEquals(shellTemplateEntity.templateContent(), 
resultEntity.templateContent());
+    Assertions.assertEquals(shellTemplateEntity.comment(), 
resultEntity.comment());
+    Assertions.assertEquals(shellTemplateEntity.namespace(), 
resultEntity.namespace());
+    Assertions.assertEquals(shellTemplateEntity.id(), resultEntity.id());
+    Assertions.assertEquals(
+        shellTemplateEntity.auditInfo().creator(), 
resultEntity.auditInfo().creator());
+
+    JobTemplate sparkJobTemplate =
+        SparkJobTemplate.builder()
+            .withName("spark-job-template")
+            .withComment("This is a spark job template")
+            .withExecutable("/path/to/spark-demo.jar")
+            .withClassName("org.example.SparkDemo")
+            .withArguments(
+                Lists.newArrayList("--input", "/path/to/input", "--output", 
"/path/to/output"))
+            .withEnvironments(ImmutableMap.of("SPARK_ENV_VAR", "value"))
+            .withConfigs(ImmutableMap.of("spark.executor.memory", "2g"))
+            .withJars(Lists.newArrayList("/path/to/dependency.jar"))
+            .withFiles(Lists.newArrayList("/path/to/config.yaml"))
+            .withArchives(Lists.newArrayList("/path/to/archive.zip"))
+            .build();
+
+    JobTemplateEntity sparkTemplateEntity =
+        JobTemplateEntity.builder()
+            .withName(sparkJobTemplate.name())
+            .withTemplateContent(
+                
JobTemplateEntity.TemplateContent.fromJobTemplate(sparkJobTemplate))
+            .withComment(sparkJobTemplate.comment())
+            .withNamespace(NamespaceUtil.ofJobTemplate("test"))
+            .withId(2L)
+            .withAuditInfo(
+                
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    JobTemplatePO.JobTemplatePOBuilder builder2 = 
JobTemplatePO.builder().withMetalakeId(1L);
+    JobTemplatePO sparkJobTemplatePO =
+        JobTemplatePO.initializeJobTemplatePO(sparkTemplateEntity, builder2);
+
+    JobTemplateEntity resultSparkEntity =
+        JobTemplatePO.fromJobTemplatePO(sparkJobTemplatePO, 
NamespaceUtil.ofJobTemplate("test"));
+
+    Assertions.assertEquals(sparkTemplateEntity.name(), 
resultSparkEntity.name());
+    Assertions.assertEquals(
+        sparkTemplateEntity.templateContent(), 
resultSparkEntity.templateContent());
+    Assertions.assertEquals(sparkTemplateEntity.comment(), 
resultSparkEntity.comment());
+    Assertions.assertEquals(sparkTemplateEntity.namespace(), 
resultSparkEntity.namespace());
+    Assertions.assertEquals(sparkTemplateEntity.id(), resultSparkEntity.id());
+    Assertions.assertEquals(
+        sparkTemplateEntity.auditInfo().creator(), 
resultSparkEntity.auditInfo().creator());
+  }
+
+  @Test
+  public void testJobPO() {
+    JobEntity jobEntity =
+        JobEntity.builder()
+            .withId(1L)
+            .withJobExecutionId("job-execution-1")
+            .withJobTemplateName("test-job-template")
+            .withStatus(JobHandle.Status.QUEUED)
+            .withNamespace(NamespaceUtil.ofJob("test"))
+            .withAuditInfo(
+                
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    JobPO.JobPOBuilder builder = JobPO.builder().withMetalakeId(1L);
+    JobPO jobPO = JobPO.initializeJobPO(jobEntity, builder);
+    JobEntity resultEntity = JobPO.fromJobPO(jobPO, 
NamespaceUtil.ofJob("test"));
+
+    Assertions.assertEquals(jobEntity.id(), resultEntity.id());
+    Assertions.assertEquals(jobEntity.jobExecutionId(), 
resultEntity.jobExecutionId());
+    Assertions.assertEquals(jobEntity.jobTemplateName(), 
resultEntity.jobTemplateName());
+    Assertions.assertEquals(jobEntity.status(), resultEntity.status());
+    Assertions.assertEquals(jobEntity.namespace(), resultEntity.namespace());
+    Assertions.assertEquals(jobEntity.auditInfo().creator(), 
resultEntity.auditInfo().creator());
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
new file mode 100644
index 0000000000..6c24addb95
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobMetaService.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestJobMetaService extends TestJDBCBackend {
+
+  private static final String METALAKE_NAME = "metalake_test_job_meta_service";
+
+  private static final AuditInfo AUDIT_INFO =
+      
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+
+  @Test
+  public void testInsertAndListJobs() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME, 
AUDIT_INFO);
+    backend.insert(metalake, false);
+
+    JobTemplateEntity jobTemplate =
+        TestJobTemplateMetaService.newShellJobTemplateEntity(
+            "test_job_template", "test_comment", METALAKE_NAME);
+    JobTemplateMetaService.getInstance().insertJobTemplate(jobTemplate, false);
+
+    JobEntity job1 =
+        TestJobTemplateMetaService.newJobEntity(
+            jobTemplate.name(), JobHandle.Status.QUEUED, METALAKE_NAME);
+    Assertions.assertDoesNotThrow(() -> 
JobMetaService.getInstance().insertJob(job1, false));
+
+    JobEntity job2 =
+        TestJobTemplateMetaService.newJobEntity(
+            jobTemplate.name(), JobHandle.Status.QUEUED, METALAKE_NAME);
+    Assertions.assertDoesNotThrow(() -> 
JobMetaService.getInstance().insertJob(job2, false));
+
+    List<JobEntity> jobs =
+        
JobMetaService.getInstance().listJobsByNamespace(NamespaceUtil.ofJob(METALAKE_NAME));
+    Assertions.assertEquals(2, jobs.size());
+    Assertions.assertTrue(jobs.contains(job1));
+    Assertions.assertTrue(jobs.contains(job2));
+
+    // Test listing jobs by job template identifier
+    List<JobEntity> jobsByTemplate =
+        
JobMetaService.getInstance().listJobsByTemplateIdent(jobTemplate.nameIdentifier());
+    Assertions.assertEquals(2, jobsByTemplate.size());
+    Assertions.assertTrue(jobsByTemplate.contains(job1));
+    Assertions.assertTrue(jobsByTemplate.contains(job2));
+
+    // Test listing jobs by non-existing template identifier
+    List<JobEntity> emptyJobs =
+        JobMetaService.getInstance()
+            .listJobsByTemplateIdent(
+                NameIdentifierUtil.ofJobTemplate(METALAKE_NAME, 
"non_existing_template"));
+    Assertions.assertTrue(emptyJobs.isEmpty());
+  }
+
+  @Test
+  public void testInsertAndGetJob() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME, 
AUDIT_INFO);
+    backend.insert(metalake, false);
+
+    JobTemplateEntity jobTemplate =
+        TestJobTemplateMetaService.newShellJobTemplateEntity(
+            "test_job_template", "test_comment", METALAKE_NAME);
+    JobTemplateMetaService.getInstance().insertJobTemplate(jobTemplate, false);
+
+    JobEntity job =
+        TestJobTemplateMetaService.newJobEntity(
+            jobTemplate.name(), JobHandle.Status.QUEUED, METALAKE_NAME);
+    Assertions.assertDoesNotThrow(() -> 
JobMetaService.getInstance().insertJob(job, false));
+
+    JobEntity retrievedJob =
+        JobMetaService.getInstance()
+            .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME, 
job.name()));
+    Assertions.assertEquals(job, retrievedJob);
+
+    // Test getting a job with a non-existing identifier
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            JobMetaService.getInstance()
+                .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME, 
"non_existing_job")));
+
+    // Test insert duplicate job
+    Assertions.assertThrows(
+        EntityAlreadyExistsException.class,
+        () -> JobMetaService.getInstance().insertJob(job, false));
+
+    // Test insert job with overwrite
+    JobEntity jobOverwrite =
+        JobEntity.builder()
+            .withId(job.id())
+            .withJobExecutionId("job-execution-new")
+            .withStatus(JobHandle.Status.FAILED)
+            .withNamespace(job.namespace())
+            .withAuditInfo(job.auditInfo())
+            .withJobTemplateName(job.jobTemplateName())
+            .build();
+    Assertions.assertDoesNotThrow(() -> 
JobMetaService.getInstance().insertJob(jobOverwrite, true));
+    JobEntity updatedJob =
+        JobMetaService.getInstance()
+            .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME, 
jobOverwrite.name()));
+    Assertions.assertEquals(jobOverwrite, updatedJob);
+  }
+
+  @Test
+  public void testDeleteJobsByLegacyTimeline() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME, 
AUDIT_INFO);
+    backend.insert(metalake, false);
+
+    JobTemplateEntity jobTemplate =
+        TestJobTemplateMetaService.newShellJobTemplateEntity(
+            "test_job_template", "test_comment", METALAKE_NAME);
+    JobTemplateMetaService.getInstance().insertJobTemplate(jobTemplate, false);
+
+    JobEntity job =
+        TestJobTemplateMetaService.newJobEntity(
+            jobTemplate.name(), JobHandle.Status.QUEUED, METALAKE_NAME);
+    Assertions.assertDoesNotThrow(() -> 
JobMetaService.getInstance().insertJob(job, false));
+
+    JobEntity retrievedJob =
+        JobMetaService.getInstance()
+            .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME, 
job.name()));
+    Assertions.assertEquals(job, retrievedJob);
+
+    long timestamp = System.currentTimeMillis();
+    JobEntity updatedJob =
+        JobEntity.builder()
+            .withId(job.id())
+            .withJobExecutionId(job.jobExecutionId())
+            .withStatus(JobHandle.Status.SUCCEEDED)
+            .withNamespace(job.namespace())
+            .withAuditInfo(job.auditInfo())
+            .withJobTemplateName(job.jobTemplateName())
+            .build();
+    Assertions.assertDoesNotThrow(() -> 
JobMetaService.getInstance().insertJob(updatedJob, true));
+
+    retrievedJob =
+        JobMetaService.getInstance()
+            .getJobByIdentifier(NameIdentifierUtil.ofJob(METALAKE_NAME, 
updatedJob.name()));
+    Assertions.assertEquals(updatedJob, retrievedJob);
+
+    long newTimestamp = timestamp + 1000;
+    Assertions.assertDoesNotThrow(
+        () -> 
JobMetaService.getInstance().deleteJobsByLegacyTimeline(newTimestamp, 10));
+
+    List<JobEntity> jobs =
+        
JobMetaService.getInstance().listJobsByNamespace(NamespaceUtil.ofJob(METALAKE_NAME));
+    Assertions.assertTrue(jobs.isEmpty(), "Jobs should be deleted by legacy 
timeline");
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobTemplateMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobTemplateMetaService.java
new file mode 100644
index 0000000000..2eb21c7620
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestJobTemplateMetaService.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.job.ShellJobTemplate;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestJobTemplateMetaService extends TestJDBCBackend {
+
+  private static final String METALAKE_NAME = "metalake_for_job_template_test";
+
+  private static final AuditInfo AUDIT_INFO =
+      
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+
+  @Test
+  public void testInsertAndListJobTemplates() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME, 
AUDIT_INFO);
+    backend.insert(metalake, false);
+
+    JobTemplateMetaService jobTemplateMetaService = 
JobTemplateMetaService.getInstance();
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            jobTemplateMetaService.getJobTemplateByIdentifier(
+                NameIdentifierUtil.ofJobTemplate(METALAKE_NAME, 
"non_existent_template")));
+
+    JobTemplateEntity testJobTemplateEntity1 =
+        newShellJobTemplateEntity(
+            "test_shell_template_1", "This is a test shell job template 1", 
METALAKE_NAME);
+
+    Assertions.assertDoesNotThrow(
+        () -> jobTemplateMetaService.insertJobTemplate(testJobTemplateEntity1, 
false));
+
+    JobTemplateEntity testJobTemplateEntity2 =
+        newSparkJobTemplateEntity(
+            "test_spark_template_1", "This is a test spark job template 1", 
METALAKE_NAME);
+
+    Assertions.assertDoesNotThrow(
+        () -> jobTemplateMetaService.insertJobTemplate(testJobTemplateEntity2, 
false));
+
+    List<JobTemplateEntity> jobTemplates =
+        jobTemplateMetaService.listJobTemplatesByNamespace(
+            NamespaceUtil.ofJobTemplate(METALAKE_NAME));
+
+    Assertions.assertEquals(2, jobTemplates.size());
+    Assertions.assertTrue(jobTemplates.contains(testJobTemplateEntity1));
+    Assertions.assertTrue(jobTemplates.contains(testJobTemplateEntity2));
+
+    // Test insert duplicate job template without overwrite
+    Assertions.assertThrows(
+        EntityAlreadyExistsException.class,
+        () -> jobTemplateMetaService.insertJobTemplate(testJobTemplateEntity1, 
false));
+
+    // Test insert duplicate job template with overwrite
+    JobTemplateEntity updatedJobTemplateEntity1 =
+        JobTemplateEntity.builder()
+            .withName(testJobTemplateEntity1.name())
+            .withComment("Updated comment for test shell job template 1")
+            .withId(testJobTemplateEntity1.id())
+            .withNamespace(testJobTemplateEntity1.namespace())
+            .withTemplateContent(testJobTemplateEntity1.templateContent())
+            .withAuditInfo(testJobTemplateEntity1.auditInfo())
+            .build();
+    Assertions.assertDoesNotThrow(
+        () -> 
jobTemplateMetaService.insertJobTemplate(updatedJobTemplateEntity1, true));
+    JobTemplateEntity fetchedJobTemplateEntity1 =
+        jobTemplateMetaService.getJobTemplateByIdentifier(
+            NameIdentifierUtil.ofJobTemplate(METALAKE_NAME, 
testJobTemplateEntity1.name()));
+    Assertions.assertEquals(updatedJobTemplateEntity1, 
fetchedJobTemplateEntity1);
+
+    jobTemplates =
+        jobTemplateMetaService.listJobTemplatesByNamespace(
+            NamespaceUtil.ofJobTemplate(METALAKE_NAME));
+    Assertions.assertEquals(2, jobTemplates.size());
+    Assertions.assertTrue(jobTemplates.contains(updatedJobTemplateEntity1));
+    Assertions.assertTrue(jobTemplates.contains(testJobTemplateEntity2));
+    Assertions.assertFalse(jobTemplates.contains(testJobTemplateEntity1));
+  }
+
+  @Test
+  public void testInsertAndSelectJobTemplate() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME, 
AUDIT_INFO);
+    backend.insert(metalake, false);
+
+    JobTemplateMetaService jobTemplateMetaService = 
JobTemplateMetaService.getInstance();
+
+    JobTemplateEntity shellJobTemplate =
+        newShellJobTemplateEntity("shell_template", "A shell job template", 
METALAKE_NAME);
+    jobTemplateMetaService.insertJobTemplate(shellJobTemplate, false);
+
+    JobTemplateEntity sparkJobTemplate =
+        newSparkJobTemplateEntity("spark_template", "A spark job template", 
METALAKE_NAME);
+    jobTemplateMetaService.insertJobTemplate(sparkJobTemplate, false);
+
+    // Test select by identifier
+    JobTemplateEntity fetchedShellJobTemplate =
+        jobTemplateMetaService.getJobTemplateByIdentifier(
+            NameIdentifierUtil.ofJobTemplate(METALAKE_NAME, "shell_template"));
+    Assertions.assertEquals(shellJobTemplate, fetchedShellJobTemplate);
+
+    JobTemplateEntity fetchedSparkJobTemplate =
+        jobTemplateMetaService.getJobTemplateByIdentifier(
+            NameIdentifierUtil.ofJobTemplate(METALAKE_NAME, "spark_template"));
+    Assertions.assertEquals(sparkJobTemplate, fetchedSparkJobTemplate);
+
+    // Test select non-existent template
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            jobTemplateMetaService.getJobTemplateByIdentifier(
+                NameIdentifierUtil.ofJobTemplate(METALAKE_NAME, 
"non_existent_template")));
+  }
+
+  @Test
+  public void testInsertAndDeleteJobTemplate() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME, 
AUDIT_INFO);
+    backend.insert(metalake, false);
+
+    JobTemplateMetaService jobTemplateMetaService = 
JobTemplateMetaService.getInstance();
+
+    JobTemplateEntity jobTemplateEntity =
+        newShellJobTemplateEntity(
+            "job_template_to_delete", "A job template to delete", 
METALAKE_NAME);
+    jobTemplateMetaService.insertJobTemplate(jobTemplateEntity, false);
+
+    // Verify insertion
+    JobTemplateEntity fetchedJobTemplate =
+        jobTemplateMetaService.getJobTemplateByIdentifier(
+            NameIdentifierUtil.ofJobTemplate(METALAKE_NAME, 
"job_template_to_delete"));
+    Assertions.assertEquals(jobTemplateEntity, fetchedJobTemplate);
+
+    // Delete the job template
+    boolean deleted =
+        jobTemplateMetaService.deleteJobTemplate(
+            NameIdentifierUtil.ofJobTemplate(METALAKE_NAME, 
"job_template_to_delete"));
+    Assertions.assertTrue(deleted);
+
+    deleted =
+        jobTemplateMetaService.deleteJobTemplate(
+            NameIdentifierUtil.ofJobTemplate(METALAKE_NAME, 
"job_template_to_delete"));
+    Assertions.assertFalse(deleted);
+  }
+
+  @Test
+  public void testDeleteJobTemplateWithJobs() throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), METALAKE_NAME, 
AUDIT_INFO);
+    backend.insert(metalake, false);
+
+    JobTemplateMetaService jobTemplateMetaService = 
JobTemplateMetaService.getInstance();
+
+    JobTemplateEntity jobTemplateEntity =
+        newShellJobTemplateEntity(
+            "job_template_with_jobs", "A job template with jobs", 
METALAKE_NAME);
+    jobTemplateMetaService.insertJobTemplate(jobTemplateEntity, false);
+
+    // Create a job using the template
+    JobEntity jobEntity1 =
+        newJobEntity("job_template_with_jobs", JobHandle.Status.STARTED, 
METALAKE_NAME);
+    backend.insert(jobEntity1, false);
+
+    JobEntity jobEntity2 =
+        newJobEntity("job_template_with_jobs", JobHandle.Status.SUCCEEDED, 
METALAKE_NAME);
+    backend.insert(jobEntity2, false);
+
+    boolean deleted =
+        jobTemplateMetaService.deleteJobTemplate(
+            NameIdentifierUtil.ofJobTemplate(METALAKE_NAME, 
"job_template_with_jobs"));
+    Assertions.assertTrue(deleted);
+
+    // Verify that the jobs are deleted
+    List<JobEntity> jobs =
+        
JobMetaService.getInstance().listJobsByNamespace(NamespaceUtil.ofJob(METALAKE_NAME));
+    Assertions.assertEquals(0, jobs.size());
+  }
+
+  static JobTemplateEntity newShellJobTemplateEntity(String name, String 
comment, String metalake) {
+    ShellJobTemplate shellJobTemplate =
+        ShellJobTemplate.builder()
+            .withName(name)
+            .withComment(comment)
+            .withExecutable("/bin/echo")
+            .build();
+
+    return JobTemplateEntity.builder()
+        .withId(RandomIdGenerator.INSTANCE.nextId())
+        .withName(name)
+        .withNamespace(NamespaceUtil.ofJobTemplate(metalake))
+        
.withTemplateContent(JobTemplateEntity.TemplateContent.fromJobTemplate(shellJobTemplate))
+        .withAuditInfo(AUDIT_INFO)
+        .build();
+  }
+
+  static JobTemplateEntity newSparkJobTemplateEntity(String name, String 
comment, String metalake) {
+    SparkJobTemplate sparkJobTemplate =
+        SparkJobTemplate.builder()
+            .withName(name)
+            .withComment(comment)
+            .withClassName("org.apache.spark.examples.SparkPi")
+            .withExecutable("file:/path/to/spark-examples.jar")
+            .build();
+
+    return JobTemplateEntity.builder()
+        .withId(RandomIdGenerator.INSTANCE.nextId())
+        .withName(name)
+        .withNamespace(NamespaceUtil.ofJobTemplate(metalake))
+        
.withTemplateContent(JobTemplateEntity.TemplateContent.fromJobTemplate(sparkJobTemplate))
+        .withAuditInfo(AUDIT_INFO)
+        .build();
+  }
+
+  static JobEntity newJobEntity(String templateName, JobHandle.Status status, 
String metalake) {
+    return JobEntity.builder()
+        .withId(RandomIdGenerator.INSTANCE.nextId())
+        .withJobExecutionId(RandomIdGenerator.INSTANCE.nextId() + "")
+        .withNamespace(NamespaceUtil.ofJob(metalake))
+        .withJobTemplateName(templateName)
+        .withStatus(status)
+        .withAuditInfo(AUDIT_INFO)
+        .build();
+  }
+}
diff --git a/scripts/h2/schema-1.0.0-h2.sql b/scripts/h2/schema-1.0.0-h2.sql
index a02f0eb427..a245174735 100644
--- a/scripts/h2/schema-1.0.0-h2.sql
+++ b/scripts/h2/schema-1.0.0-h2.sql
@@ -383,3 +383,34 @@ CREATE TABLE IF NOT EXISTS `policy_relation_meta` (
     KEY `idx_pid` (`policy_id`),
     KEY `idx_prmid` (`metadata_object_id`)
 ) ENGINE=InnoDB;
+
+CREATE TABLE IF NOT EXISTS `job_template_meta` (
+    `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+    `job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `job_template_comment` TEXT DEFAULT NULL COMMENT 'job template comment',
+    `job_template_content` MEDIUMTEXT NOT NULL COMMENT 'job template content',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job template audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template 
current version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job template 
deleted at',
+    PRIMARY KEY (`job_template_id`),
+    UNIQUE KEY `uk_mid_jtn_del` (`metalake_id`, `job_template_name`, 
`deleted_at`)
+) ENGINE=InnoDB;
+
+CREATE TABLE IF NOT EXISTS `job_run_meta` (
+    `job_run_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job run id',
+    `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `job_execution_id` varchar(256) NOT NULL COMMENT 'job execution id',
+    `job_run_status` varchar(64) NOT NULL COMMENT 'job run status',
+    `job_finished_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job 
finished at',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job run audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run current 
version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job run 
deleted at',
+    PRIMARY KEY (`job_run_id`),
+    UNIQUE KEY `uk_mid_jei_del` (`metalake_id`, `job_execution_id`, 
`deleted_at`),
+    KEY `idx_job_template_id` (`job_template_id`),
+    KEY `idx_job_execution_id` (`job_execution_id`)
+) ENGINE=InnoDB;
diff --git a/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql 
b/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql
index f156d49551..7ceca25db4 100644
--- a/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql
+++ b/scripts/h2/upgrade-0.9.0-to-1.0.0-h2.sql
@@ -67,4 +67,35 @@ ALTER TABLE `model_version_info` ADD COLUMN 
`model_version_uri_name` VARCHAR(256
 ALTER TABLE `model_version_info` DROP INDEX `uk_mid_ver_del`;
 ALTER TABLE `model_version_info` ADD CONSTRAINT `uk_mid_ver_uri_del` UNIQUE 
(`model_id`, `version`, `model_version_uri_name`, `deleted_at`);
 -- remove the default value for model_version_uri_name
-ALTER TABLE `model_version_info` ALTER COLUMN `model_version_uri_name` DROP 
DEFAULT;
\ No newline at end of file
+ALTER TABLE `model_version_info` ALTER COLUMN `model_version_uri_name` DROP 
DEFAULT;
+
+CREATE TABLE IF NOT EXISTS `job_template_meta` (
+    `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+    `job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `job_template_comment` TEXT DEFAULT NULL COMMENT 'job template comment',
+    `job_template_content` MEDIUMTEXT NOT NULL COMMENT 'job template content',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job template audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template 
current version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job template 
deleted at',
+    PRIMARY KEY (`job_template_id`),
+    UNIQUE KEY `uk_mid_jtn_del` (`metalake_id`, `job_template_name`, 
`deleted_at`)
+) ENGINE=InnoDB;
+
+CREATE TABLE IF NOT EXISTS `job_run_meta` (
+    `job_run_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job run id',
+    `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `job_execution_id` varchar(256) NOT NULL COMMENT 'job execution id',
+    `job_run_status` varchar(64) NOT NULL COMMENT 'job run status',
+    `job_finished_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job 
finished at',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job run audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run current 
version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job run 
deleted at',
+    PRIMARY KEY (`job_run_id`),
+    UNIQUE KEY `uk_mid_jei_del` (`metalake_id`, `job_execution_id`, 
`deleted_at`),
+    KEY `idx_job_template_id` (`job_template_id`),
+    KEY `idx_job_execution_id` (`job_execution_id`)
+) ENGINE=InnoDB;
diff --git a/scripts/mysql/schema-1.0.0-mysql.sql 
b/scripts/mysql/schema-1.0.0-mysql.sql
index b11c729432..07c14ad304 100644
--- a/scripts/mysql/schema-1.0.0-mysql.sql
+++ b/scripts/mysql/schema-1.0.0-mysql.sql
@@ -373,4 +373,35 @@ CREATE TABLE IF NOT EXISTS `policy_relation_meta` (
     UNIQUE KEY `uk_pi_mi_mo_del` (`policy_id`, `metadata_object_id`, 
`metadata_object_type`, `deleted_at`),
     KEY `idx_pid` (`policy_id`),
     KEY `idx_mid` (`metadata_object_id`)
-) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'policy 
metadata object relation';
\ No newline at end of file
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'policy 
metadata object relation';
+
+CREATE TABLE IF NOT EXISTS `job_template_meta` (
+    `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+    `job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `job_template_comment` TEXT DEFAULT NULL COMMENT 'job template comment',
+    `job_template_content` MEDIUMTEXT NOT NULL COMMENT 'job template content',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job template audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template 
current version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job template 
deleted at',
+    PRIMARY KEY (`job_template_id`),
+    UNIQUE KEY `uk_mid_jtn_del` (`metalake_id`, `job_template_name`, 
`deleted_at`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'job 
template metadata';
+
+CREATE TABLE IF NOT EXISTS `job_run_meta` (
+    `job_run_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job run id',
+    `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `job_execution_id` varchar(256) NOT NULL COMMENT 'job execution id',
+    `job_run_status` varchar(64) NOT NULL COMMENT 'job run status',
+    `job_finished_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job 
finished at',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job run audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run current 
version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job run 
deleted at',
+    PRIMARY KEY (`job_run_id`),
+    UNIQUE KEY `uk_mid_jei_del` (`metalake_id`, `job_execution_id`, 
`deleted_at`),
+    KEY `idx_job_template_id` (`job_template_id`),
+    KEY `idx_job_execution_id` (`job_execution_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'job run 
metadata';
diff --git a/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql 
b/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql
index 48438ddfbe..c0890b5aa0 100644
--- a/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql
+++ b/scripts/mysql/upgrade-0.9.0-to-1.0.0-mysql.sql
@@ -68,3 +68,34 @@ ALTER TABLE `model_version_info` DROP INDEX `uk_mid_ver_del`;
 ALTER TABLE `model_version_info` ADD CONSTRAINT `uk_mid_ver_uri_del` UNIQUE 
KEY (`model_id`, `version`, `model_version_uri_name`, `deleted_at`);
 -- remove the default value for model_version_uri_name
 ALTER TABLE `model_version_info` ALTER COLUMN `model_version_uri_name` DROP 
DEFAULT;
+
+CREATE TABLE IF NOT EXISTS `job_template_meta` (
+    `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+    `job_template_name` VARCHAR(128) NOT NULL COMMENT 'job template name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `job_template_comment` TEXT DEFAULT NULL COMMENT 'job template comment',
+    `job_template_content` MEDIUMTEXT NOT NULL COMMENT 'job template content',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job template audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template 
current version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job template last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job template 
deleted at',
+    PRIMARY KEY (`job_template_id`),
+    UNIQUE KEY `uk_mid_jtn_del` (`metalake_id`, `job_template_name`, 
`deleted_at`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'job 
template metadata';
+
+CREATE TABLE IF NOT EXISTS `job_run_meta` (
+    `job_run_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job run id',
+    `job_template_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'job template id',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `job_execution_id` varchar(256) NOT NULL COMMENT 'job execution id',
+    `job_run_status` varchar(64) NOT NULL COMMENT 'job run status',
+    `job_finished_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job 
finished at',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'job run audit info',
+    `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run current 
version',
+    `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'job run last 
version',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'job run 
deleted at',
+    PRIMARY KEY (`job_run_id`),
+    UNIQUE KEY `uk_mid_jei_del` (`metalake_id`, `job_execution_id`, 
`deleted_at`),
+    KEY `idx_job_template_id` (`job_template_id`),
+    KEY `idx_job_execution_id` (`job_execution_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'job run 
metadata';
diff --git a/scripts/postgresql/schema-1.0.0-postgresql.sql 
b/scripts/postgresql/schema-1.0.0-postgresql.sql
index 7c8ee084fa..d97ace9844 100644
--- a/scripts/postgresql/schema-1.0.0-postgresql.sql
+++ b/scripts/postgresql/schema-1.0.0-postgresql.sql
@@ -668,3 +668,59 @@ COMMENT ON COLUMN policy_relation_meta.audit_info IS 
'policy relation audit info
 COMMENT ON COLUMN policy_relation_meta.current_version IS 'policy relation 
current version';
 COMMENT ON COLUMN policy_relation_meta.last_version IS 'policy relation last 
version';
 COMMENT ON COLUMN policy_relation_meta.deleted_at IS 'policy relation deleted 
at';
+
+
+CREATE TABLE IF NOT EXISTS job_template_meta (
+    job_template_id BIGINT NOT NULL,
+    job_template_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    job_template_comment TEXT DEFAULT NULL,
+    job_template_content TEXT NOT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (job_template_id),
+    UNIQUE (metalake_id, job_template_name, deleted_at)
+);
+
+COMMENT ON TABLE job_template_meta IS 'job template metadata';
+COMMENT ON COLUMN job_template_meta.job_template_id IS 'job template id';
+COMMENT ON COLUMN job_template_meta.job_template_name IS 'job template name';
+COMMENT ON COLUMN job_template_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN job_template_meta.job_template_comment IS 'job template 
comment';
+COMMENT ON COLUMN job_template_meta.job_template_content IS 'job template 
content';
+COMMENT ON COLUMN job_template_meta.audit_info IS 'job template audit info';
+COMMENT ON COLUMN job_template_meta.current_version IS 'job template current 
version';
+COMMENT ON COLUMN job_template_meta.last_version IS 'job template last 
version';
+COMMENT ON COLUMN job_template_meta.deleted_at IS 'job template deleted at';
+
+
+CREATE TABLE IF NOT EXISTS job_run_meta (
+    job_run_id BIGINT NOT NULL,
+    job_template_id BIGINT NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    job_execution_id VARCHAR(256) NOT NULL,
+    job_run_status VARCHAR(64) NOT NULL,
+    job_finished_at BIGINT NOT NULL DEFAULT 0,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (job_run_id),
+    UNIQUE (metalake_id, job_execution_id, deleted_at)
+);
+
+CREATE INDEX IF NOT EXISTS idx_job_template_id ON job_run_meta 
(job_template_id);
+CREATE INDEX IF NOT EXISTS idx_job_execution_id ON job_run_meta 
(job_execution_id);
+COMMENT ON TABLE job_run_meta IS 'job run metadata';
+COMMENT ON COLUMN job_run_meta.job_run_id IS 'job run id';
+COMMENT ON COLUMN job_run_meta.job_template_id IS 'job template id';
+COMMENT ON COLUMN job_run_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN job_run_meta.job_execution_id IS 'job execution id';
+COMMENT ON COLUMN job_run_meta.job_run_status IS 'job run status';
+COMMENT ON COLUMN job_run_meta.job_finished_at IS 'job run finished at';
+COMMENT ON COLUMN job_run_meta.audit_info IS 'job run audit info';
+COMMENT ON COLUMN job_run_meta.current_version IS 'job run current version';
+COMMENT ON COLUMN job_run_meta.last_version IS 'job run last version';
+COMMENT ON COLUMN job_run_meta.deleted_at IS 'job run deleted at';
diff --git a/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql 
b/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
index e96d1a82ae..a95ad69517 100644
--- a/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
+++ b/scripts/postgresql/upgrade-0.9.0-to-1.0.0-postgresql.sql
@@ -104,3 +104,59 @@ ALTER TABLE model_version_info DROP CONSTRAINT 
model_version_info_model_id_versi
 ALTER TABLE model_version_info ADD CONSTRAINT uk_mid_ver_uri_del UNIQUE 
(model_id, version, model_version_uri_name, deleted_at);
 -- remove the default value for model_version_uri_name
 ALTER TABLE model_version_info ALTER COLUMN model_version_uri_name DROP 
DEFAULT;
+
+
+CREATE TABLE IF NOT EXISTS job_template_meta (
+    job_template_id BIGINT NOT NULL,
+    job_template_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    job_template_comment TEXT DEFAULT NULL,
+    job_template_content TEXT NOT NULL,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (job_template_id),
+    UNIQUE (metalake_id, job_template_name, deleted_at)
+);
+
+COMMENT ON TABLE job_template_meta IS 'job template metadata';
+COMMENT ON COLUMN job_template_meta.job_template_id IS 'job template id';
+COMMENT ON COLUMN job_template_meta.job_template_name IS 'job template name';
+COMMENT ON COLUMN job_template_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN job_template_meta.job_template_comment IS 'job template 
comment';
+COMMENT ON COLUMN job_template_meta.job_template_content IS 'job template 
content';
+COMMENT ON COLUMN job_template_meta.audit_info IS 'job template audit info';
+COMMENT ON COLUMN job_template_meta.current_version IS 'job template current 
version';
+COMMENT ON COLUMN job_template_meta.last_version IS 'job template last 
version';
+COMMENT ON COLUMN job_template_meta.deleted_at IS 'job template deleted at';
+
+
+CREATE TABLE IF NOT EXISTS job_run_meta (
+    job_run_id BIGINT NOT NULL,
+    job_template_id BIGINT NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    job_execution_id VARCHAR(256) NOT NULL,
+    job_run_status VARCHAR(64) NOT NULL,
+    job_finished_at BIGINT NOT NULL DEFAULT 0,
+    audit_info TEXT NOT NULL,
+    current_version INT NOT NULL DEFAULT 1,
+    last_version INT NOT NULL DEFAULT 1,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (job_run_id),
+    UNIQUE (metalake_id, job_execution_id, deleted_at)
+);
+
+CREATE INDEX IF NOT EXISTS idx_job_template_id ON job_run_meta 
(job_template_id);
+CREATE INDEX IF NOT EXISTS idx_job_execution_id ON job_run_meta 
(job_execution_id);
+COMMENT ON TABLE job_run_meta IS 'job run metadata';
+COMMENT ON COLUMN job_run_meta.job_run_id IS 'job run id';
+COMMENT ON COLUMN job_run_meta.job_template_id IS 'job template id';
+COMMENT ON COLUMN job_run_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN job_run_meta.job_execution_id IS 'job execution id';
+COMMENT ON COLUMN job_run_meta.job_run_status IS 'job run status';
+COMMENT ON COLUMN job_run_meta.job_finished_at IS 'job finished at';
+COMMENT ON COLUMN job_run_meta.audit_info IS 'job run audit info';
+COMMENT ON COLUMN job_run_meta.current_version IS 'job run current version';
+COMMENT ON COLUMN job_run_meta.last_version IS 'job run last version';
+COMMENT ON COLUMN job_run_meta.deleted_at IS 'job run deleted at';

Reply via email to