This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new fc757d987e [#9541] feat(core): Add built-in job template framework in 
Gravitino (#9542)
fc757d987e is described below

commit fc757d987e79c1fa7285095066cf53f6a2fdad7a
Author: Jerry Shao <[email protected]>
AuthorDate: Wed Dec 31 15:39:43 2025 +0800

    [#9541] feat(core): Add built-in job template framework in Gravitino (#9542)
    
    ### What changes were proposed in this pull request?
    
    This PR add a built-in job template support in Gravitino's job system.
    
    ### Why are the changes needed?
    
    Fix: #9541
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UTs.
---
 .../IllegalJobTemplateOperationException.java      |  54 +++
 .../apache/gravitino/job/JobTemplateProvider.java  |  49 ++
 build.gradle.kts                                   |  30 +-
 .../org/apache/gravitino/client/ErrorHandlers.java |   9 +-
 .../gravitino/client/integration/test/JobIT.java   |   7 -
 .../test/authorization/JobAuthorizationIT.java     |   7 +-
 clients/client-python/gravitino/exceptions/base.py |   8 +
 .../exceptions/handlers/job_error_handler.py       |   5 +
 .../tests/integration/test_supports_jobs.py        |   7 -
 .../org/apache/gravitino/json/TestSerializer.java  |  42 +-
 .../java/org/apache/gravitino/GravitinoEnv.java    |  14 +-
 .../job/BuiltInJobTemplateEventListener.java       | 395 ++++++++++++++++
 .../java/org/apache/gravitino/job/JobManager.java  |  25 +-
 .../job/JobTemplateValidationDispatcher.java       | 182 +++++++
 .../apache/gravitino/metalake/MetalakeManager.java |  25 +
 .../job/TestBuiltInJobTemplateEventListener.java   | 525 +++++++++++++++++++++
 .../org/apache/gravitino/job/TestJobManager.java   |  11 +
 .../job/TestJobTemplateValidationDispatcher.java   | 254 ++++++++++
 .../gravitino/metalake/TestMetalakeManager.java    |  30 ++
 maintenance/jobs/build.gradle.kts                  |  69 +++
 .../gravitino/maintenance/jobs/BuiltInJob.java     |  47 ++
 .../jobs/BuiltInJobTemplateProvider.java           |  68 +++
 .../maintenance/jobs/spark/SparkPiJob.java         | 104 ++++
 .../org.apache.gravitino.job.JobTemplateProvider   |  19 +
 .../gravitino/maintenance/jobs/TestBuiltInJob.java |  58 +++
 .../jobs/TestBuiltInJobTemplateProvider.java       |  66 +++
 .../maintenance/jobs/spark/TestSparkPiJob.java     | 134 ++++++
 .../server/web/rest/ExceptionHandlers.java         |   6 +-
 settings.gradle.kts                                |   2 +-
 29 files changed, 2178 insertions(+), 74 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/exceptions/IllegalJobTemplateOperationException.java
 
b/api/src/main/java/org/apache/gravitino/exceptions/IllegalJobTemplateOperationException.java
new file mode 100644
index 0000000000..764c8ef722
--- /dev/null
+++ 
b/api/src/main/java/org/apache/gravitino/exceptions/IllegalJobTemplateOperationException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.exceptions;
+
+import com.google.errorprone.annotations.FormatMethod;
+import com.google.errorprone.annotations.FormatString;
+
+/**
+ * Exception thrown when an illegal operation is attempted on a job template, 
such as trying to
+ * modify or delete a built-in job template, or creating a user template with 
a reserved name.
+ */
+public class IllegalJobTemplateOperationException extends 
IllegalArgumentException {
+
+  /**
+   * Constructs a new IllegalJobTemplateOperationException with the specified 
detail message.
+   *
+   * @param message the detail message
+   * @param args the arguments to the message
+   */
+  @FormatMethod
+  public IllegalJobTemplateOperationException(@FormatString String message, 
Object... args) {
+    super(String.format(message, args));
+  }
+
+  /**
+   * Constructs a new IllegalJobTemplateOperationException with the specified 
detail message and
+   * cause.
+   *
+   * @param cause the cause
+   * @param message the detail message
+   * @param args the arguments to the message
+   */
+  @FormatMethod
+  public IllegalJobTemplateOperationException(
+      Throwable cause, @FormatString String message, Object... args) {
+    super(String.format(message, args), cause);
+  }
+}
diff --git 
a/api/src/main/java/org/apache/gravitino/job/JobTemplateProvider.java 
b/api/src/main/java/org/apache/gravitino/job/JobTemplateProvider.java
new file mode 100644
index 0000000000..2dfee7ed1a
--- /dev/null
+++ b/api/src/main/java/org/apache/gravitino/job/JobTemplateProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job;
+
+import java.util.List;
+
+/**
+ * JobTemplateProvider exposes job templates that can be registered into 
Gravitino.
+ *
+ * <p>Implementations are expected to be discoverable (for example via SPI) so 
that Gravitino can
+ * onboard built-in job templates automatically.
+ */
+public interface JobTemplateProvider {
+
+  /** Prefix for built-in job template names. */
+  String BUILTIN_NAME_PREFIX = "builtin-";
+
+  /** Regex to validate built-in job template names. */
+  String BUILTIN_NAME_PATTERN = "^" + BUILTIN_NAME_PREFIX + "[\\w-]+$";
+
+  /** Property key used to carry the built-in job template version. */
+  String PROPERTY_VERSION_KEY = "version";
+
+  /** Regex for version property value (e.g., v1, v2). */
+  String VERSION_VALUE_PATTERN = "v\\d+";
+
+  /**
+   * Return all job templates provided by this implementation.
+   *
+   * @return a list of job templates to register
+   */
+  List<? extends JobTemplate> jobTemplates();
+}
diff --git a/build.gradle.kts b/build.gradle.kts
index aaa743b7ac..eb8b45ade9 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -314,14 +314,22 @@ subprojects {
   }
 
   fun compatibleWithJDK8(project: Project): Boolean {
+    val name = project.name.lowercase()
+    val path = project.path.lowercase()
+    if (path.startsWith(":maintenance:jobs") ||
+      name == "api" ||
+      name == "common" ||
+      name == "catalog-common" ||
+      name == "hadoop-common"
+    ) {
+      return true
+    }
+
     val isReleaseRun = gradle.startParameter.taskNames.any { it == "release" 
|| it == "publish" || it == "publishToMavenLocal" }
     if (!isReleaseRun) {
       return false
     }
 
-    val name = project.name.lowercase()
-    val path = project.path.lowercase()
-
     if (path.startsWith(":client") ||
       path.startsWith(":spark-connector") ||
       path.startsWith(":flink-connector") ||
@@ -330,12 +338,6 @@ subprojects {
       return true
     }
 
-    if (name == "api" || name == "common" ||
-      name == "catalog-common" || name == "hadoop-common"
-    ) {
-      return true
-    }
-
     return false
   }
   extensions.extraProperties.set("excludePackagesForSparkConnector", 
::excludePackagesForSparkConnector)
@@ -713,6 +715,7 @@ tasks {
       "copySubprojectDependencies",
       "copySubprojectLib",
       "copyCliLib",
+      "copyJobsLib",
       ":authorizations:copyLibAndConfig",
       ":iceberg:iceberg-rest-server:copyLibAndConfigs",
       ":lance:lance-rest-server:copyLibAndConfigs",
@@ -1015,6 +1018,15 @@ tasks {
     setDuplicatesStrategy(DuplicatesStrategy.EXCLUDE)
   }
 
+  register("copyJobsLib", Copy::class) {
+    dependsOn(":maintenance:jobs:build")
+    from("maintenance/jobs/build/libs")
+    into("distribution/package/auxlib")
+    include("gravitino-jobs-*.jar")
+    exclude("*-empty.jar")
+    setDuplicatesStrategy(DuplicatesStrategy.EXCLUDE)
+  }
+
   register("copySubprojectLib", Copy::class) {
     subprojects.forEach() {
       if (!it.name.startsWith("authorization") &&
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
index 705731808f..62806e106f 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
@@ -34,6 +34,7 @@ import 
org.apache.gravitino.exceptions.ConnectionFailedException;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.ForbiddenException;
 import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
+import org.apache.gravitino.exceptions.IllegalJobTemplateOperationException;
 import org.apache.gravitino.exceptions.IllegalMetadataObjectException;
 import org.apache.gravitino.exceptions.IllegalPrivilegeException;
 import org.apache.gravitino.exceptions.IllegalRoleException;
@@ -1165,7 +1166,13 @@ public class ErrorHandlers {
 
       switch (errorResponse.getCode()) {
         case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
-          throw new IllegalArgumentException(errorMsg);
+          if (errorResponse
+              .getType()
+              
.equals(IllegalJobTemplateOperationException.class.getSimpleName())) {
+            throw new IllegalJobTemplateOperationException(errorMsg);
+          } else {
+            throw new IllegalArgumentException(errorMsg);
+          }
 
         case ErrorConstants.NOT_FOUND_CODE:
           if 
(errorResponse.getType().equals(NoSuchMetalakeException.class.getSimpleName())) 
{
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
index f5e66b304d..9a0096513f 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
@@ -110,7 +110,6 @@ public class JobIT extends BaseIT {
     Assertions.assertDoesNotThrow(() -> 
metalake.registerJobTemplate(template2));
 
     List<JobTemplate> registeredTemplates = metalake.listJobTemplates();
-    Assertions.assertEquals(2, registeredTemplates.size());
     Assertions.assertTrue(registeredTemplates.contains(template1));
     Assertions.assertTrue(registeredTemplates.contains(template2));
 
@@ -140,7 +139,6 @@ public class JobIT extends BaseIT {
     Assertions.assertDoesNotThrow(() -> 
metalake.registerJobTemplate(template2));
 
     List<JobTemplate> registeredTemplates = metalake.listJobTemplates();
-    Assertions.assertEquals(2, registeredTemplates.size());
     Assertions.assertTrue(registeredTemplates.contains(template1));
     Assertions.assertTrue(registeredTemplates.contains(template2));
 
@@ -160,7 +158,6 @@ public class JobIT extends BaseIT {
 
     // Verify the list of job templates after deletion
     registeredTemplates = metalake.listJobTemplates();
-    Assertions.assertEquals(1, registeredTemplates.size());
     Assertions.assertTrue(registeredTemplates.contains(template2));
 
     // Test deleting a non-existent job template
@@ -172,10 +169,6 @@ public class JobIT extends BaseIT {
     // Verify the second job template is deleted
     Assertions.assertThrows(
         NoSuchJobTemplateException.class, () -> 
metalake.getJobTemplate(template2.name()));
-
-    // Verify the list of job templates is empty after deleting both
-    registeredTemplates = metalake.listJobTemplates();
-    Assertions.assertTrue(registeredTemplates.isEmpty());
   }
 
   @Test
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/JobAuthorizationIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/JobAuthorizationIT.java
index a2841cb9b2..fea6cc8094 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/JobAuthorizationIT.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/JobAuthorizationIT.java
@@ -117,11 +117,14 @@ public class JobAuthorizationIT extends 
BaseRestApiAuthorizationIT {
     // Normal user can see job templates they own (test_1, test_2)
     List<JobTemplate> normalUserTemplates =
         normalUserClient.loadMetalake(METALAKE).listJobTemplates();
-    Assertions.assertEquals(2, normalUserTemplates.size());
+    Assertions.assertTrue(normalUserTemplates.stream().anyMatch(s -> 
s.name().equals("test_1")));
+    Assertions.assertTrue(normalUserTemplates.stream().anyMatch(s -> 
s.name().equals("test_2")));
 
     // Admin can see all job templates (test_1, test_2, test_3)
     List<JobTemplate> adminTemplates = 
client.loadMetalake(METALAKE).listJobTemplates();
-    Assertions.assertEquals(3, adminTemplates.size());
+    Assertions.assertTrue(adminTemplates.stream().anyMatch(s -> 
s.name().equals("test_1")));
+    Assertions.assertTrue(adminTemplates.stream().anyMatch(s -> 
s.name().equals("test_2")));
+    Assertions.assertTrue(adminTemplates.stream().anyMatch(s -> 
s.name().equals("test_3")));
   }
 
   @Test
diff --git a/clients/client-python/gravitino/exceptions/base.py 
b/clients/client-python/gravitino/exceptions/base.py
index de7ef01bce..1661dc6559 100644
--- a/clients/client-python/gravitino/exceptions/base.py
+++ b/clients/client-python/gravitino/exceptions/base.py
@@ -181,6 +181,14 @@ class 
JobTemplateAlreadyExistsException(AlreadyExistsException):
     """An exception thrown when a job template with specified name already 
exists."""
 
 
+class IllegalJobTemplateOperationException(IllegalArgumentException):
+    """An exception thrown when an illegal operation is attempted on a job 
template.
+
+    This exception is raised when attempting to modify or delete a built-in 
job template,
+    or when trying to create a user template with a reserved name (e.g., 
starting with 'builtin-').
+    """
+
+
 class NoSuchJobTemplateException(NotFoundException):
     """An exception thrown when a job template with specified name is not 
found."""
 
diff --git 
a/clients/client-python/gravitino/exceptions/handlers/job_error_handler.py 
b/clients/client-python/gravitino/exceptions/handlers/job_error_handler.py
index fc96e4c2b8..a769cd1605 100644
--- a/clients/client-python/gravitino/exceptions/handlers/job_error_handler.py
+++ b/clients/client-python/gravitino/exceptions/handlers/job_error_handler.py
@@ -21,6 +21,7 @@ from gravitino.exceptions.handlers.rest_error_handler import 
RestErrorHandler
 from gravitino.exceptions.base import (
     NoSuchMetalakeException,
     JobTemplateAlreadyExistsException,
+    IllegalJobTemplateOperationException,
     NoSuchJobTemplateException,
     NoSuchJobException,
     InUseException,
@@ -34,6 +35,10 @@ class JobErrorHandler(RestErrorHandler):
         code = error_response.code()
         exception_type = error_response.type()
 
+        if code == ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
+            if exception_type == IllegalJobTemplateOperationException.__name__:
+                raise IllegalJobTemplateOperationException(error_message)
+
         if code == ErrorConstants.NOT_FOUND_CODE:
             if exception_type == NoSuchMetalakeException.__name__:
                 raise NoSuchMetalakeException(error_message)
diff --git a/clients/client-python/tests/integration/test_supports_jobs.py 
b/clients/client-python/tests/integration/test_supports_jobs.py
index 4a5195a530..e613085cea 100644
--- a/clients/client-python/tests/integration/test_supports_jobs.py
+++ b/clients/client-python/tests/integration/test_supports_jobs.py
@@ -95,7 +95,6 @@ class TestSupportsJobs(IntegrationTestEnv):
 
         # List registered job templates
         registered_templates = self._metalake.list_job_templates()
-        self.assertEqual(len(registered_templates), 2)
         self.assertIn(template_1, registered_templates)
         self.assertIn(template_2, registered_templates)
 
@@ -127,7 +126,6 @@ class TestSupportsJobs(IntegrationTestEnv):
 
         # List registered job templates
         registered_templates = self._metalake.list_job_templates()
-        self.assertEqual(len(registered_templates), 2)
         self.assertIn(template1, registered_templates)
         self.assertIn(template2, registered_templates)
 
@@ -150,7 +148,6 @@ class TestSupportsJobs(IntegrationTestEnv):
 
         # Verify the list of job templates after deletion
         registered_templates = self._metalake.list_job_templates()
-        self.assertEqual(len(registered_templates), 1)
         self.assertIn(template2, registered_templates)
 
         # Test deleting a non-existent job template
@@ -163,10 +160,6 @@ class TestSupportsJobs(IntegrationTestEnv):
         with self.assertRaises(NoSuchJobTemplateException):
             self._metalake.get_job_template(template2.name)
 
-        # Verify the list of job templates is empty after deleting both
-        registered_templates = self._metalake.list_job_templates()
-        self.assertTrue(len(registered_templates) == 0)
-
     def test_register_and_alter_job_template(self):
         template = self.builder.with_name("test_alter").build()
         self._metalake.register_job_template(template)
diff --git a/common/src/test/java/org/apache/gravitino/json/TestSerializer.java 
b/common/src/test/java/org/apache/gravitino/json/TestSerializer.java
index a8fff05622..f5e11a9778 100644
--- a/common/src/test/java/org/apache/gravitino/json/TestSerializer.java
+++ b/common/src/test/java/org/apache/gravitino/json/TestSerializer.java
@@ -57,8 +57,8 @@ public class TestSerializer {
     String actualJson =
         
JsonUtils.anyFieldMapper().writeValueAsString(DTOConverters.toDTO(distribution));
     String expectedJson =
-        """
-            
{"strategy":"even","number":10,"funcArgs":[{"type":"field","fieldName":["col1"]}]}""";
+        "{\"strategy\":\"even\",\"number\":10,\"funcArgs\":"
+            + "[{\"type\":\"field\",\"fieldName\":[\"col1\"]}]}";
     Assertions.assertEquals(expectedJson, actualJson);
     DistributionDTO deserialized =
         JsonUtils.anyFieldMapper().readValue(actualJson, 
DistributionDTO.class);
@@ -73,10 +73,10 @@ public class TestSerializer {
                     "bucket", Literals.integerLiteral(10), 
NamedReference.field("col_1")));
     actualJson = 
JsonUtils.anyFieldMapper().writeValueAsString(DTOConverters.toDTO(distribution));
     expectedJson =
-        """
-          
{"strategy":"even","number":10,"funcArgs":[{"type":"function","funcName":"bucket",\
-          
"funcArgs":[{"type":"literal","dataType":"integer","value":"10"},{"type":"field",\
-          "fieldName":["col_1"]}]}]}""";
+        
"{\"strategy\":\"even\",\"number\":10,\"funcArgs\":[{\"type\":\"function\","
+            + "\"funcName\":\"bucket\",\"funcArgs\":[{\"type\":\"literal\","
+            + "\"dataType\":\"integer\",\"value\":\"10\"},{\"type\":\"field\","
+            + "\"fieldName\":[\"col_1\"]}]}]}";
     Assertions.assertEquals(expectedJson, actualJson);
     deserialized = JsonUtils.anyFieldMapper().readValue(actualJson, 
DistributionDTO.class);
     Assertions.assertEquals(distribution, DTOConverters.fromDTO(deserialized));
@@ -90,9 +90,8 @@ public class TestSerializer {
     String actualJson =
         
JsonUtils.anyFieldMapper().writeValueAsString(DTOConverters.toDTO(sortOrder));
     String expectedJson =
-        """
-            
{"sortTerm":{"type":"field","fieldName":["col1"]},"direction":"asc",\
-            "nullOrdering":"nulls_last"}""";
+        "{\"sortTerm\":{\"type\":\"field\",\"fieldName\":[\"col1\"]},"
+            + "\"direction\":\"asc\",\"nullOrdering\":\"nulls_last\"}";
     Assertions.assertEquals(expectedJson, actualJson);
 
     SortOrderDTO deserialized =
@@ -105,9 +104,9 @@ public class TestSerializer {
             SortDirection.DESCENDING);
     actualJson = 
JsonUtils.anyFieldMapper().writeValueAsString(DTOConverters.toDTO(sortOrder));
     expectedJson =
-        """
-          
{"sortTerm":{"type":"function","funcName":"lower","funcArgs":[{"type":"field",\
-          
"fieldName":["col_1"]}]},"direction":"desc","nullOrdering":"nulls_last"}""";
+        "{\"sortTerm\":{\"type\":\"function\",\"funcName\":\"lower\","
+            + "\"funcArgs\":[{\"type\":\"field\",\"fieldName\":[\"col_1\"]}]},"
+            + "\"direction\":\"desc\",\"nullOrdering\":\"nulls_last\"}";
     Assertions.assertEquals(expectedJson, actualJson);
     deserialized = JsonUtils.anyFieldMapper().readValue(actualJson, 
SortOrderDTO.class);
     Assertions.assertEquals(sortOrder, DTOConverters.fromDTO(deserialized));
@@ -121,8 +120,7 @@ public class TestSerializer {
 
     String actualJson = 
JsonUtils.anyFieldMapper().writeValueAsString((DTOConverters.toDTO(index)));
     String expectedJson =
-        """
-            
{"indexType":"PRIMARY_KEY","name":"index_1","fieldNames":[["col1"]]}""";
+        "{\"indexType\":\"PRIMARY_KEY\",\"name\":\"index_1\"," + 
"\"fieldNames\":[[\"col1\"]]}";
     Assertions.assertEquals(expectedJson, actualJson);
     IndexDTO deserialized = JsonUtils.anyFieldMapper().readValue(actualJson, 
IndexDTO.class);
     Assertions.assertEquals(index, DTOConverters.fromDTO(deserialized));
@@ -135,8 +133,8 @@ public class TestSerializer {
                 new String[][] {new String[] {"col1"}, new String[] {"col2"}});
     actualJson = JsonUtils.anyFieldMapper().writeValueAsString(index);
     expectedJson =
-        """
-            
{"indexType":"unique_key","name":"index_2","fieldNames":[["col1"],["col2"]]}""";
+        "{\"indexType\":\"unique_key\",\"name\":\"index_2\","
+            + "\"fieldNames\":[[\"col1\"],[\"col2\"]]}";
     Assertions.assertEquals(expectedJson, actualJson);
   }
 
@@ -146,8 +144,7 @@ public class TestSerializer {
     Transform transform = DayPartitioningDTO.of("dt");
     String actualJson =
         
JsonUtils.anyFieldMapper().writeValueAsString(DTOConverters.toDTO(transform));
-    String expectedJson = """
-          {"strategy":"day","fieldName":["dt"]}""";
+    String expectedJson = "{\"strategy\":\"day\",\"fieldName\":[\"dt\"]}";
     Assertions.assertEquals(expectedJson, actualJson);
 
     Partitioning deserialized =
@@ -175,10 +172,11 @@ public class TestSerializer {
 
     actualJson = JsonUtils.anyFieldMapper().writeValueAsString(transform);
     expectedJson =
-        """
-            
{"type":"range_partitioning","fieldName":["dt"],"assignments":[{"name":"p1",
-            
"properties":null,"upper":{"type":"literal","dataType":"string","value":"2024-01-01"},
-            
"lower":{"type":"literal","dataType":"string","value":"2023-01-01"}}]}""";
+        "{\"type\":\"range_partitioning\",\"fieldName\":[\"dt\"],"
+            + "\"assignments\":[{\"name\":\"p1\",\"properties\":null,"
+            + "\"upper\":{\"type\":\"literal\",\"dataType\":\"string\","
+            + "\"value\":\"2024-01-01\"},\"lower\":{\"type\":\"literal\","
+            + "\"dataType\":\"string\",\"value\":\"2023-01-01\"}}]}";
     Assertions.assertEquals(expectedJson, actualJson);
     deserialized = JsonUtils.anyFieldMapper().readValue(actualJson, 
Partitioning.class);
     Assertions.assertEquals(transform, DTOConverters.fromDTO(deserialized));
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index aa58ddf855..a4403c8126 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -63,8 +63,10 @@ import org.apache.gravitino.hook.SchemaHookDispatcher;
 import org.apache.gravitino.hook.TableHookDispatcher;
 import org.apache.gravitino.hook.TagHookDispatcher;
 import org.apache.gravitino.hook.TopicHookDispatcher;
+import org.apache.gravitino.job.BuiltInJobTemplateEventListener;
 import org.apache.gravitino.job.JobManager;
 import org.apache.gravitino.job.JobOperationDispatcher;
+import org.apache.gravitino.job.JobTemplateValidationDispatcher;
 import org.apache.gravitino.listener.AccessControlEventDispatcher;
 import org.apache.gravitino.listener.CatalogEventDispatcher;
 import org.apache.gravitino.listener.EventBus;
@@ -655,8 +657,16 @@ public class GravitinoEnv {
         new PolicyEventDispatcher(eventBus, new PolicyManager(idGenerator, 
entityStore));
     this.policyDispatcher = new PolicyHookDispatcher(policyEventDispatcher);
 
+    JobManager jobManager = new JobManager(config, entityStore, idGenerator);
+    JobTemplateValidationDispatcher validationDispatcher =
+        new JobTemplateValidationDispatcher(jobManager);
     this.jobOperationDispatcher =
-        new JobEventDispatcher(
-            eventBus, new JobHookDispatcher(new JobManager(config, 
entityStore, idGenerator)));
+        new JobEventDispatcher(eventBus, new 
JobHookDispatcher(validationDispatcher));
+
+    // Register built-in job template event listener to automatically register 
templates
+    // when metalakes are created
+    BuiltInJobTemplateEventListener builtInJobTemplateListener =
+        new BuiltInJobTemplateEventListener(jobManager, entityStore, 
idGenerator);
+    eventListenerManager.addEventListener("builtin-job-template", 
builtInJobTemplateListener);
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/job/BuiltInJobTemplateEventListener.java
 
b/core/src/main/java/org/apache/gravitino/job/BuiltInJobTemplateEventListener.java
new file mode 100644
index 0000000000..a445aa13aa
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/job/BuiltInJobTemplateEventListener.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.job;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URLClassLoader;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.listener.api.EventListenerPlugin;
+import org.apache.gravitino.listener.api.event.CreateMetalakeEvent;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.lock.LockType;
+import org.apache.gravitino.lock.TreeLockUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.metalake.MetalakeManager;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Event listener that automatically registers built-in job templates when 
metalakes are created.
+ *
+ * <p>This listener monitors metalake creation events and registers all 
discovered built-in job
+ * templates (via JobTemplateProvider SPI) into the newly created metalake. It 
also handles
+ * registration for existing metalakes on first startup.
+ */
+public class BuiltInJobTemplateEventListener implements EventListenerPlugin {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BuiltInJobTemplateEventListener.class);
+
+  private static final Pattern BUILTIN_JOB_TEMPLATE_NAME_PATTERN =
+      Pattern.compile(JobTemplateProvider.BUILTIN_NAME_PATTERN);
+  private static final Pattern VERSION_PATTERN =
+      Pattern.compile(JobTemplateProvider.VERSION_VALUE_PATTERN);
+
+  private final JobManager jobManager;
+  private final EntityStore entityStore;
+  private final IdGenerator idGenerator;
+
+  public BuiltInJobTemplateEventListener(
+      JobManager jobManager, EntityStore entityStore, IdGenerator idGenerator) 
{
+    this.jobManager = jobManager;
+    this.entityStore = entityStore;
+    this.idGenerator = idGenerator;
+  }
+
+  @Override
+  public void init(Map<String, String> properties) throws RuntimeException {
+    // Dependencies will be set via setDependencies() method
+    // This is called from EventListenerManager before start()
+  }
+
+  @Override
+  public void start() throws RuntimeException {
+    // Register built-in job templates for all existing metalakes on first 
startup
+    try {
+      List<String> existingMetalakes = 
MetalakeManager.listInUseMetalakes(entityStore);
+      if (existingMetalakes.isEmpty()) {
+        return;
+      }
+
+      LOG.info(
+          "Registering built-in job templates for {} existing metalakes", 
existingMetalakes.size());
+
+      Map<String, JobTemplate> builtInTemplates = loadBuiltInJobTemplates();
+      if (builtInTemplates.isEmpty()) {
+        LOG.info("No built-in job templates discovered via 
JobTemplateProvider");
+        return;
+      }
+
+      existingMetalakes.forEach(
+          metalake -> {
+            try {
+              reconcileBuiltInJobTemplates(metalake, builtInTemplates);
+            } catch (Exception e) {
+              LOG.error("Failed to register built-in job templates for 
metalake: {}", metalake, e);
+            }
+          });
+
+    } catch (Exception e) {
+      LOG.error("Failed to register built-in job templates for existing 
metalakes", e);
+    }
+  }
+
+  @Override
+  public void stop() throws RuntimeException {
+    // No resources to clean up
+  }
+
+  @Override
+  public void onPostEvent(Event postEvent) throws RuntimeException {
+    if (postEvent instanceof CreateMetalakeEvent) {
+      CreateMetalakeEvent event = (CreateMetalakeEvent) postEvent;
+      String metalakeName = event.identifier().name();
+
+      try {
+        Map<String, JobTemplate> builtInTemplates = loadBuiltInJobTemplates();
+        if (builtInTemplates.isEmpty()) {
+          LOG.debug("No built-in job templates to register for metalake: {}", 
metalakeName);
+          return;
+        }
+
+        reconcileBuiltInJobTemplates(metalakeName, builtInTemplates);
+        LOG.info("Registered built-in job templates for metalake: {}", 
metalakeName);
+      } catch (Exception e) {
+        LOG.error("Failed to register built-in job templates for metalake: 
{}", metalakeName, e);
+      }
+    }
+  }
+
+  @Override
+  public Mode mode() {
+    // Use async isolated to avoid blocking metalake creation
+    return Mode.ASYNC_ISOLATED;
+  }
+
+  @VisibleForTesting
+  Map<String, JobTemplate> loadBuiltInJobTemplates() {
+    Map<String, JobTemplate> builtInTemplates = Maps.newHashMap();
+
+    // Load from auxlib directory if available
+    ClassLoader auxlibClassLoader = null;
+    try {
+      auxlibClassLoader = createAuxlibClassLoader();
+      ServiceLoader<JobTemplateProvider> loader =
+          ServiceLoader.load(JobTemplateProvider.class, auxlibClassLoader);
+
+      loader.forEach(
+          provider ->
+              provider
+                  .jobTemplates()
+                  .forEach(
+                      template -> {
+                        if (!isValidBuiltInJobTemplate(template)) {
+                          LOG.warn("Skip invalid built-in job template {}", 
template.name());
+                          return;
+                        }
+
+                        JobTemplate existing = 
builtInTemplates.get(template.name());
+                        int newVersion = version(template.customFields());
+                        int existingVersion =
+                            Optional.ofNullable(existing)
+                                .map(jt -> version(jt.customFields()))
+                                .orElse(0);
+                        if (existing == null || newVersion > existingVersion) {
+                          builtInTemplates.put(template.name(), template);
+                        }
+                      }));
+      return builtInTemplates;
+
+    } finally {
+      if (auxlibClassLoader instanceof URLClassLoader) {
+        try {
+          ((URLClassLoader) auxlibClassLoader).close();
+        } catch (IOException e) {
+          LOG.warn("Failed to close auxlib classloader", e);
+        }
+      }
+    }
+  }
+
+  private ClassLoader createAuxlibClassLoader() {
+    String gravitinoHome = System.getenv("GRAVITINO_HOME");
+    if (gravitinoHome == null) {
+      LOG.warn("GRAVITINO_HOME not set, using current classloader for built-in 
job templates");
+      return Thread.currentThread().getContextClassLoader();
+    }
+
+    File auxlibDir = new File(gravitinoHome, "auxlib");
+    if (!auxlibDir.exists() || !auxlibDir.isDirectory()) {
+      LOG.warn(
+          "Auxlib directory {} does not exist, using current classloader for 
built-in job templates",
+          auxlibDir.getAbsolutePath());
+      return Thread.currentThread().getContextClassLoader();
+    }
+
+    // Only load gravitino-jobs-*.jar files
+    File[] jarFiles =
+        auxlibDir.listFiles(
+            (dir, name) -> name.startsWith("gravitino-jobs-") && 
name.endsWith(".jar"));
+    if (jarFiles == null || jarFiles.length == 0) {
+      LOG.info(
+          "No gravitino-jobs JAR files found in auxlib directory {}", 
auxlibDir.getAbsolutePath());
+      return Thread.currentThread().getContextClassLoader();
+    }
+
+    try {
+      URI[] jarUris = 
Arrays.stream(jarFiles).map(File::toURI).toArray(URI[]::new);
+      LOG.info(
+          "Loading built-in job templates from {} gravitino-jobs JAR file(s) 
in auxlib directory",
+          jarUris.length);
+      return new URLClassLoader(
+          Arrays.stream(jarUris)
+              .map(
+                  uri -> {
+                    try {
+                      return uri.toURL();
+                    } catch (MalformedURLException e) {
+                      throw new RuntimeException("Failed to convert URI to 
URL: " + uri, e);
+                    }
+                  })
+              .toArray(java.net.URL[]::new),
+          Thread.currentThread().getContextClassLoader());
+    } catch (Exception e) {
+      LOG.error("Failed to create auxlib classloader", e);
+      return Thread.currentThread().getContextClassLoader();
+    }
+  }
+
+  private boolean isValidBuiltInJobTemplate(JobTemplate jobTemplate) {
+    if (!isValidBuiltInName(jobTemplate.name())) {
+      return false;
+    }
+
+    return getVersion(jobTemplate.customFields()).isPresent();
+  }
+
+  @VisibleForTesting
+  void reconcileBuiltInJobTemplates(String metalake, Map<String, JobTemplate> 
builtInTemplates) {
+    Map<String, JobTemplateEntity> existingBuiltIns =
+        jobManager.listJobTemplates(metalake).stream()
+            .filter(entity -> isValidBuiltInName(entity.name()))
+            .collect(Collectors.toMap(JobTemplateEntity::name, entity -> 
entity));
+
+    builtInTemplates.forEach(
+        (name, newTemplate) -> {
+          JobTemplateEntity existing = existingBuiltIns.remove(name);
+          if (existing == null) {
+            registerNewBuiltInJobTemplate(metalake, newTemplate);
+          } else {
+            int existingVersion = 
version(existing.templateContent().customFields());
+            int newVersion = version(newTemplate.customFields());
+            if (newVersion > existingVersion) {
+              updateBuiltInJobTemplate(metalake, existing, newTemplate);
+            } else {
+              LOG.info("Built-in job template {} under metalake {} is up to 
date", name, metalake);
+            }
+          }
+        });
+
+    existingBuiltIns.values().forEach(entity -> 
deleteObsoleteBuiltInJobTemplate(metalake, entity));
+  }
+
+  private void registerNewBuiltInJobTemplate(String metalake, JobTemplate 
jobTemplate) {
+    JobTemplateEntity jobTemplateEntity =
+        JobTemplateEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(jobTemplate.name())
+            .withNamespace(NamespaceUtil.ofJobTemplate(metalake))
+            .withComment(jobTemplate.comment())
+            
.withTemplateContent(JobTemplateEntity.TemplateContent.fromJobTemplate(jobTemplate))
+            .withAuditInfo(
+                AuditInfo.builder()
+                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withCreateTime(Instant.now())
+                    .build())
+            .build();
+
+    try {
+      jobManager.registerJobTemplate(metalake, jobTemplateEntity);
+      LOG.info(
+          "Registered built-in job template {} under metalake {}", 
jobTemplate.name(), metalake);
+    } catch (JobTemplateAlreadyExistsException e) {
+      LOG.warn(
+          "Built-in job template {} already exists under metalake {}, skip 
registering",
+          jobTemplate.name(),
+          metalake,
+          e);
+    }
+  }
+
+  private void updateBuiltInJobTemplate(
+      String metalake, JobTemplateEntity existing, JobTemplate newTemplate) {
+    NameIdentifier identifier = NameIdentifierUtil.ofJobTemplate(metalake, 
newTemplate.name());
+
+    TreeLockUtils.doWithTreeLock(
+        identifier,
+        LockType.WRITE,
+        () -> {
+          try {
+            return entityStore.update(
+                identifier,
+                JobTemplateEntity.class,
+                Entity.EntityType.JOB_TEMPLATE,
+                jobTemplateEntity ->
+                    JobTemplateEntity.builder()
+                        .withId(jobTemplateEntity.id())
+                        .withName(newTemplate.name())
+                        .withNamespace(jobTemplateEntity.namespace())
+                        .withComment(newTemplate.comment())
+                        .withTemplateContent(
+                            
JobTemplateEntity.TemplateContent.fromJobTemplate(newTemplate))
+                        .withAuditInfo(
+                            AuditInfo.builder()
+                                
.withCreator(jobTemplateEntity.auditInfo().creator())
+                                
.withCreateTime(jobTemplateEntity.auditInfo().createTime())
+                                
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                                .withLastModifiedTime(Instant.now())
+                                .build())
+                        .build());
+          } catch (NoSuchEntityException | IOException e) {
+            throw new RuntimeException(
+                String.format(
+                    "Failed to update built-in job template %s under metalake 
%s",
+                    newTemplate.name(), metalake),
+                e);
+          }
+        });
+
+    LOG.info(
+        "Updated built-in job template {} under metalake {} from v{} to v{}",
+        newTemplate.name(),
+        metalake,
+        version(existing.templateContent().customFields()),
+        version(newTemplate.customFields()));
+  }
+
+  private void deleteObsoleteBuiltInJobTemplate(String metalake, 
JobTemplateEntity jobTemplate) {
+    try {
+      if (jobManager.deleteJobTemplate(metalake, jobTemplate.name())) {
+        LOG.info(
+            "Deleted obsolete built-in job template {} under metalake {}",
+            jobTemplate.name(),
+            metalake);
+      }
+    } catch (Exception e) {
+      LOG.warn(
+          "Failed to delete obsolete built-in job template {} under metalake 
{}",
+          jobTemplate.name(),
+          metalake,
+          e);
+    }
+  }
+
+  private boolean isValidBuiltInName(String name) {
+    return BUILTIN_JOB_TEMPLATE_NAME_PATTERN.matcher(name).matches();
+  }
+
+  private Optional<Integer> getVersion(Map<String, String> customFields) {
+    return Optional.ofNullable(customFields)
+        .map(fields -> fields.get(JobTemplateProvider.PROPERTY_VERSION_KEY))
+        .filter(StringUtils::isNotBlank)
+        .flatMap(
+            version -> {
+              Matcher matcher = VERSION_PATTERN.matcher(version);
+              if (!matcher.matches()) {
+                return Optional.empty();
+              }
+              return Optional.of(Integer.parseInt(version.substring(1)));
+            });
+  }
+
+  private int version(Map<String, String> customFields) {
+    return getVersion(customFields).orElse(0);
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java 
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index 49d3a900b9..dd68b6c161 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -19,7 +19,6 @@
 
 package org.apache.gravitino.job;
 
-import static org.apache.gravitino.Metalake.PROPERTY_IN_USE;
 import static org.apache.gravitino.metalake.MetalakeManager.checkMetalake;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -58,9 +57,9 @@ import 
org.apache.gravitino.exceptions.NoSuchJobTemplateException;
 import org.apache.gravitino.lock.LockType;
 import org.apache.gravitino.lock.TreeLockUtils;
 import org.apache.gravitino.meta.AuditInfo;
-import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.JobEntity;
 import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.metalake.MetalakeManager;
 import org.apache.gravitino.storage.IdGenerator;
 import org.apache.gravitino.utils.NameIdentifierUtil;
 import org.apache.gravitino.utils.NamespaceUtil;
@@ -536,7 +535,7 @@ public class JobManager implements JobOperationDispatcher {
 
   @VisibleForTesting
   void pullAndUpdateJobStatus() {
-    List<String> metalakes = listInUseMetalakes(entityStore);
+    List<String> metalakes = MetalakeManager.listInUseMetalakes(entityStore);
     for (String metalake : metalakes) {
       // This unnecessary list all the jobs, we need to improve the code to 
only list the active
       // jobs.
@@ -626,7 +625,7 @@ public class JobManager implements JobOperationDispatcher {
 
   @VisibleForTesting
   void cleanUpStagingDirs() {
-    List<String> metalakes = listInUseMetalakes(entityStore);
+    List<String> metalakes = MetalakeManager.listInUseMetalakes(entityStore);
 
     for (String metalake : metalakes) {
       List<JobEntity> finishedJobs =
@@ -823,24 +822,6 @@ public class JobManager implements JobOperationDispatcher {
     }
   }
 
-  private static List<String> listInUseMetalakes(EntityStore entityStore) {
-    try {
-      List<BaseMetalake> metalakes =
-          TreeLockUtils.doWithRootTreeLock(
-              LockType.READ,
-              () ->
-                  entityStore.list(
-                      Namespace.empty(), BaseMetalake.class, 
Entity.EntityType.METALAKE));
-      return metalakes.stream()
-          .filter(
-              m -> (boolean) 
m.propertiesMetadata().getOrDefault(m.properties(), PROPERTY_IN_USE))
-          .map(BaseMetalake::name)
-          .collect(Collectors.toList());
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to list in-use metalakes", e);
-    }
-  }
-
   @VisibleForTesting
   JobTemplateEntity updateJobTemplateEntity(
       NameIdentifier jobTemplateIdent,
diff --git 
a/core/src/main/java/org/apache/gravitino/job/JobTemplateValidationDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/job/JobTemplateValidationDispatcher.java
new file mode 100644
index 0000000000..6a25735a5b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/job/JobTemplateValidationDispatcher.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.exceptions.IllegalJobTemplateOperationException;
+import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+
+/**
+ * {@code JobTemplateValidationDispatcher} is a decorator for {@link 
JobOperationDispatcher} that
+ * enforces validation rules for job template operations, specifically 
protecting built-in job
+ * templates from being modified or deleted by users.
+ *
+ * <p>Built-in job templates are identified by names starting with {@link
+ * JobTemplateProvider#BUILTIN_NAME_PREFIX}. These templates are managed by 
the system and cannot be
+ * created, altered, or deleted through user operations.
+ *
+ * <p>This dispatcher ensures that:
+ *
+ * <ul>
+ *   <li>Users cannot register job templates with names starting with 
"builtin-"
+ *   <li>Users cannot alter built-in job templates
+ *   <li>Users cannot delete built-in job templates
+ *   <li>Users cannot rename job templates to names starting with "builtin-"
+ * </ul>
+ */
+public class JobTemplateValidationDispatcher implements JobOperationDispatcher 
{
+
+  private final JobOperationDispatcher dispatcher;
+
+  /**
+   * Creates a new JobTemplateValidationDispatcher.
+   *
+   * @param dispatcher the underlying dispatcher to delegate operations to
+   */
+  public JobTemplateValidationDispatcher(JobOperationDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public List<JobTemplateEntity> listJobTemplates(String metalake) {
+    return dispatcher.listJobTemplates(metalake);
+  }
+
+  @Override
+  public void registerJobTemplate(String metalake, JobTemplateEntity 
jobTemplateEntity)
+      throws JobTemplateAlreadyExistsException {
+    validateNotBuiltInTemplateName(jobTemplateEntity.name());
+    dispatcher.registerJobTemplate(metalake, jobTemplateEntity);
+  }
+
+  @Override
+  public JobTemplateEntity getJobTemplate(String metalake, String 
jobTemplateName)
+      throws NoSuchJobTemplateException {
+    return dispatcher.getJobTemplate(metalake, jobTemplateName);
+  }
+
+  @Override
+  public boolean deleteJobTemplate(String metalake, String jobTemplateName) 
throws InUseException {
+    validateNotBuiltInTemplate(jobTemplateName, "delete");
+    return dispatcher.deleteJobTemplate(metalake, jobTemplateName);
+  }
+
+  @Override
+  public JobTemplateEntity alterJobTemplate(
+      String metalake, String jobTemplateName, JobTemplateChange... changes)
+      throws NoSuchJobTemplateException, IllegalArgumentException {
+    validateNotBuiltInTemplate(jobTemplateName, "alter");
+
+    // Check if any rename operation tries to rename to a built-in name
+    Optional<String> newName =
+        Arrays.stream(changes)
+            .filter(c -> c instanceof JobTemplateChange.RenameJobTemplate)
+            .map(c -> ((JobTemplateChange.RenameJobTemplate) c).getNewName())
+            .reduce((first, second) -> second);
+
+    if (newName.isPresent()) {
+      validateNotBuiltInTemplateName(newName.get());
+    }
+
+    return dispatcher.alterJobTemplate(metalake, jobTemplateName, changes);
+  }
+
+  @Override
+  public List<JobEntity> listJobs(String metalake, Optional<String> 
jobTemplateName)
+      throws NoSuchJobTemplateException {
+    return dispatcher.listJobs(metalake, jobTemplateName);
+  }
+
+  @Override
+  public JobEntity getJob(String metalake, String jobId) throws 
NoSuchJobException {
+    return dispatcher.getJob(metalake, jobId);
+  }
+
+  @Override
+  public JobEntity runJob(String metalake, String jobTemplateName, Map<String, 
String> jobConf)
+      throws NoSuchJobTemplateException {
+    return dispatcher.runJob(metalake, jobTemplateName, jobConf);
+  }
+
+  @Override
+  public JobEntity cancelJob(String metalake, String jobId) throws 
NoSuchJobException {
+    return dispatcher.cancelJob(metalake, jobId);
+  }
+
+  @Override
+  public void close() throws IOException {
+    dispatcher.close();
+  }
+
+  /**
+   * Validates that the given job template name does not start with the 
built-in prefix.
+   *
+   * @param templateName the job template name to validate
+   * @throws IllegalJobTemplateOperationException if the name starts with the 
built-in prefix
+   */
+  @VisibleForTesting
+  void validateNotBuiltInTemplateName(String templateName) {
+    if (isBuiltInTemplateName(templateName)) {
+      throw new IllegalJobTemplateOperationException(
+          "Job template name '%s' is reserved for built-in templates. "
+              + "User-created job templates cannot have names starting with 
'%s'",
+          templateName, JobTemplateProvider.BUILTIN_NAME_PREFIX);
+    }
+  }
+
+  /**
+   * Validates that the given job template is not a built-in template before 
performing the
+   * specified operation.
+   *
+   * @param templateName the job template name to validate
+   * @param operation the operation being attempted (e.g., "delete", "alter")
+   * @throws IllegalJobTemplateOperationException if the template is a 
built-in template
+   */
+  @VisibleForTesting
+  void validateNotBuiltInTemplate(String templateName, String operation) {
+    if (isBuiltInTemplateName(templateName)) {
+      throw new IllegalJobTemplateOperationException(
+          "Cannot %s built-in job template '%s'. "
+              + "Built-in job templates are managed by the system and cannot 
be modified or deleted"
+              + " by users",
+          operation, templateName);
+    }
+  }
+
+  /**
+   * Checks if the given template name is a built-in template name.
+   *
+   * @param templateName the template name to check
+   * @return true if the name starts with the built-in prefix, false otherwise
+   */
+  @VisibleForTesting
+  static boolean isBuiltInTemplateName(String templateName) {
+    return templateName != null && 
templateName.startsWith(JobTemplateProvider.BUILTIN_NAME_PREFIX);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java 
b/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
index 9b2a968acf..fd6592d727 100644
--- a/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
+++ b/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.gravitino.Entity.EntityType;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.EntityStore;
@@ -129,6 +130,29 @@ public class MetalakeManager implements 
MetalakeDispatcher, Closeable {
     }
   }
 
+  /**
+   * Lists all in-use Metalakes.
+   *
+   * @param entityStore The EntityStore to use for managing Metalakes.
+   * @return A list of names of in-use Metalakes.
+   * @throws RuntimeException If listing in-use Metalakes encounters storage 
issues.
+   */
+  public static List<String> listInUseMetalakes(EntityStore entityStore) {
+    try {
+      List<BaseMetalake> metalakes =
+          TreeLockUtils.doWithRootTreeLock(
+              LockType.READ,
+              () -> entityStore.list(Namespace.empty(), BaseMetalake.class, 
EntityType.METALAKE));
+      return metalakes.stream()
+          .filter(
+              m -> (boolean) 
m.propertiesMetadata().getOrDefault(m.properties(), PROPERTY_IN_USE))
+          .map(BaseMetalake::name)
+          .collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to list in-use metalakes", e);
+    }
+  }
+
   /**
    * Lists all available Metalakes.
    *
@@ -151,6 +175,7 @@ public class MetalakeManager implements MetalakeDispatcher, 
Closeable {
       throw new RuntimeException(ioe);
     }
   }
+
   /**
    * Loads a Metalake.
    *
diff --git 
a/core/src/test/java/org/apache/gravitino/job/TestBuiltInJobTemplateEventListener.java
 
b/core/src/test/java/org/apache/gravitino/job/TestBuiltInJobTemplateEventListener.java
new file mode 100644
index 0000000000..e5273e6f83
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/job/TestBuiltInJobTemplateEventListener.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.job;
+
+import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
+import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
+import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.apache.gravitino.Metalake.PROPERTY_IN_USE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.function.Consumer;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.listener.api.event.CreateMetalakeEvent;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.info.MetalakeInfo;
+import org.apache.gravitino.lock.LockManager;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.meta.SchemaVersion;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.memory.TestMemoryEntityStore;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testcontainers.shaded.org.apache.commons.lang3.reflect.FieldUtils;
+
+public class TestBuiltInJobTemplateEventListener {
+
+  private BuiltInJobTemplateEventListener listener;
+  private JobManager jobManager;
+  private EntityStore entityStore;
+  private IdGenerator idGenerator;
+  private Config config;
+
+  @BeforeEach
+  public void setUp() throws IllegalAccessException {
+    config = Mockito.mock(Config.class);
+
+    doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
+    doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
+    doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+
+    entityStore = new TestMemoryEntityStore.InMemoryEntityStore();
+    entityStore.initialize(config);
+
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
+    idGenerator = new RandomIdGenerator();
+
+    jobManager = mock(JobManager.class);
+    listener = new BuiltInJobTemplateEventListener(jobManager, entityStore, 
idGenerator);
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    if (entityStore != null) {
+      entityStore.close();
+    }
+  }
+
+  @Test
+  public void testOnPostCreateMetalakeEvent() {
+    String metalakeName = "test_metalake";
+    NameIdentifier identifier = NameIdentifier.of(metalakeName);
+    MetalakeInfo metalakeInfo = new MetalakeInfo(metalakeName, "comment", 
null, null);
+
+    CreateMetalakeEvent event = new CreateMetalakeEvent("user", identifier, 
metalakeInfo);
+
+    // Mock loadBuiltInJobTemplates to return one template
+    JobTemplateProvider provider = mock(JobTemplateProvider.class);
+    ShellJobTemplate template =
+        ShellJobTemplate.builder()
+            .withName("builtin-test")
+            .withComment("test template")
+            .withExecutable("/bin/echo")
+            .withCustomFields(Collections.singletonMap("version", "v1"))
+            .build();
+
+    when(provider.jobTemplates()).thenAnswer(invocation -> 
Collections.singletonList(template));
+
+    try (MockedStatic<ServiceLoader> mockedLoader = 
mockStatic(ServiceLoader.class)) {
+      ServiceLoader<JobTemplateProvider> serviceLoader = 
mock(ServiceLoader.class);
+      mockedLoader
+          .when(() -> ServiceLoader.load(eq(JobTemplateProvider.class), any()))
+          .thenReturn(serviceLoader);
+      
when(serviceLoader.iterator()).thenReturn(Lists.newArrayList(provider).iterator());
+      
when(serviceLoader.spliterator()).thenReturn(Lists.newArrayList(provider).spliterator());
+      Mockito.doAnswer(
+              invocation -> {
+                Consumer<JobTemplateProvider> consumer = 
invocation.getArgument(0);
+                consumer.accept(provider);
+                return null;
+              })
+          .when(serviceLoader)
+          .forEach(any());
+
+      // Mock jobManager methods
+      
when(jobManager.listJobTemplates(metalakeName)).thenReturn(Collections.emptyList());
+      doNothing().when(jobManager).registerJobTemplate(eq(metalakeName), 
any());
+
+      listener.onPostEvent(event);
+
+      // Verify registerJobTemplate was called
+      verify(jobManager, times(1)).registerJobTemplate(eq(metalakeName), 
any());
+    }
+  }
+
+  @Test
+  public void testOnPostEventWithNonMetalakeEvent() {
+    Event otherEvent = mock(Event.class);
+
+    // Should not throw exception and should not interact with jobManager
+    listener.onPostEvent(otherEvent);
+
+    verify(jobManager, times(0)).listJobTemplates(any());
+    verify(jobManager, times(0)).registerJobTemplate(any(), any());
+  }
+
+  @Test
+  public void testStartWithExistingMetalakes() throws IOException {
+    // Create real metalakes in entity store
+    createMetalake("metalake1", true);
+    createMetalake("metalake2", true);
+    createMetalake("metalake3", false); // not in use
+
+    // Mock loadBuiltInJobTemplates
+    JobTemplateProvider provider = mock(JobTemplateProvider.class);
+    ShellJobTemplate template =
+        ShellJobTemplate.builder()
+            .withName("builtin-test")
+            .withComment("test")
+            .withExecutable("/bin/echo")
+            .withCustomFields(Collections.singletonMap("version", "v1"))
+            .build();
+    when(provider.jobTemplates()).thenAnswer(invocation -> 
Collections.singletonList(template));
+
+    try (MockedStatic<ServiceLoader> mockedLoader = 
mockStatic(ServiceLoader.class)) {
+      ServiceLoader<JobTemplateProvider> serviceLoader = 
mock(ServiceLoader.class);
+      mockedLoader
+          .when(() -> ServiceLoader.load(eq(JobTemplateProvider.class), any()))
+          .thenReturn(serviceLoader);
+      
when(serviceLoader.iterator()).thenReturn(Lists.newArrayList(provider).iterator());
+      
when(serviceLoader.spliterator()).thenReturn(Lists.newArrayList(provider).spliterator());
+      Mockito.doAnswer(
+              invocation -> {
+                Consumer<JobTemplateProvider> consumer = 
invocation.getArgument(0);
+                consumer.accept(provider);
+                return null;
+              })
+          .when(serviceLoader)
+          .forEach(any());
+
+      
when(jobManager.listJobTemplates(any())).thenReturn(Collections.emptyList());
+      doNothing().when(jobManager).registerJobTemplate(any(), any());
+
+      listener.start();
+
+      // Should only register for metalake1 and metalake2 (in-use metalakes)
+      verify(jobManager, times(1)).listJobTemplates("metalake1");
+      verify(jobManager, times(1)).listJobTemplates("metalake2");
+      verify(jobManager, times(0)).listJobTemplates("metalake3");
+      verify(jobManager, times(2)).registerJobTemplate(any(), any());
+    }
+  }
+
+  @Test
+  public void testStartWithNoExistingMetalakes() throws IOException {
+    // No metalakes created, entityStore is empty
+    listener.start();
+
+    // Should not interact with jobManager
+    verify(jobManager, times(0)).listJobTemplates(any());
+    verify(jobManager, times(0)).registerJobTemplate(any(), any());
+  }
+
+  @Test
+  public void testLoadBuiltInJobTemplatesWithValidTemplates() {
+    JobTemplateProvider provider = mock(JobTemplateProvider.class);
+
+    ShellJobTemplate template1 =
+        ShellJobTemplate.builder()
+            .withName("builtin-sparkpi")
+            .withComment("v1")
+            .withExecutable("/bin/echo")
+            .withCustomFields(Collections.singletonMap("version", "v1"))
+            .build();
+
+    ShellJobTemplate template2 =
+        ShellJobTemplate.builder()
+            .withName("builtin-sparkpi")
+            .withComment("v2")
+            .withExecutable("/bin/echo")
+            .withCustomFields(Collections.singletonMap("version", "v2"))
+            .build();
+
+    when(provider.jobTemplates()).thenAnswer(invocation -> 
Arrays.asList(template1, template2));
+
+    try (MockedStatic<ServiceLoader> mockedLoader = 
mockStatic(ServiceLoader.class)) {
+      ServiceLoader<JobTemplateProvider> serviceLoader = 
mock(ServiceLoader.class);
+      mockedLoader
+          .when(() -> ServiceLoader.load(eq(JobTemplateProvider.class), any()))
+          .thenReturn(serviceLoader);
+      
when(serviceLoader.iterator()).thenReturn(Lists.newArrayList(provider).iterator());
+      
when(serviceLoader.spliterator()).thenReturn(Lists.newArrayList(provider).spliterator());
+      Mockito.doAnswer(
+              invocation -> {
+                Consumer<JobTemplateProvider> consumer = 
invocation.getArgument(0);
+                consumer.accept(provider);
+                return null;
+              })
+          .when(serviceLoader)
+          .forEach(any());
+
+      Map<String, JobTemplate> templates = listener.loadBuiltInJobTemplates();
+
+      // Should keep the higher version (v2)
+      Assertions.assertEquals(1, templates.size());
+      Assertions.assertTrue(templates.containsKey("builtin-sparkpi"));
+      Assertions.assertEquals("v2", 
templates.get("builtin-sparkpi").customFields().get("version"));
+    }
+  }
+
+  @Test
+  public void testLoadBuiltInJobTemplatesWithInvalidNames() {
+    JobTemplateProvider provider = mock(JobTemplateProvider.class);
+
+    ShellJobTemplate invalidName =
+        ShellJobTemplate.builder()
+            .withName("sparkpi") // Missing builtin- prefix
+            .withComment("invalid")
+            .withExecutable("/bin/echo")
+            .withCustomFields(Collections.singletonMap("version", "v1"))
+            .build();
+
+    ShellJobTemplate validName =
+        ShellJobTemplate.builder()
+            .withName("builtin-valid")
+            .withComment("valid")
+            .withExecutable("/bin/echo")
+            .withCustomFields(Collections.singletonMap("version", "v1"))
+            .build();
+
+    when(provider.jobTemplates()).thenAnswer(invocation -> 
Arrays.asList(invalidName, validName));
+
+    try (MockedStatic<ServiceLoader> mockedLoader = 
mockStatic(ServiceLoader.class)) {
+      ServiceLoader<JobTemplateProvider> serviceLoader = 
mock(ServiceLoader.class);
+      mockedLoader
+          .when(() -> ServiceLoader.load(eq(JobTemplateProvider.class), any()))
+          .thenReturn(serviceLoader);
+      
when(serviceLoader.iterator()).thenReturn(Lists.newArrayList(provider).iterator());
+      
when(serviceLoader.spliterator()).thenReturn(Lists.newArrayList(provider).spliterator());
+      Mockito.doAnswer(
+              invocation -> {
+                Consumer<JobTemplateProvider> consumer = 
invocation.getArgument(0);
+                consumer.accept(provider);
+                return null;
+              })
+          .when(serviceLoader)
+          .forEach(any());
+
+      Map<String, JobTemplate> templates = listener.loadBuiltInJobTemplates();
+
+      // Should only include valid template
+      Assertions.assertEquals(1, templates.size());
+      Assertions.assertTrue(templates.containsKey("builtin-valid"));
+      Assertions.assertFalse(templates.containsKey("sparkpi"));
+    }
+  }
+
+  @Test
+  public void testLoadBuiltInJobTemplatesWithInvalidVersions() {
+    JobTemplateProvider provider = mock(JobTemplateProvider.class);
+
+    ShellJobTemplate invalidVersion =
+        ShellJobTemplate.builder()
+            .withName("builtin-invalid")
+            .withComment("invalid")
+            .withExecutable("/bin/echo")
+            .withCustomFields(Collections.singletonMap("version", "version1")) 
// Invalid format
+            .build();
+
+    ShellJobTemplate validVersion =
+        ShellJobTemplate.builder()
+            .withName("builtin-valid")
+            .withComment("valid")
+            .withExecutable("/bin/echo")
+            .withCustomFields(Collections.singletonMap("version", "v1"))
+            .build();
+
+    when(provider.jobTemplates())
+        .thenAnswer(invocation -> Arrays.asList(invalidVersion, validVersion));
+
+    try (MockedStatic<ServiceLoader> mockedLoader = 
mockStatic(ServiceLoader.class)) {
+      ServiceLoader<JobTemplateProvider> serviceLoader = 
mock(ServiceLoader.class);
+      mockedLoader
+          .when(() -> ServiceLoader.load(eq(JobTemplateProvider.class), any()))
+          .thenReturn(serviceLoader);
+      
when(serviceLoader.iterator()).thenReturn(Lists.newArrayList(provider).iterator());
+      
when(serviceLoader.spliterator()).thenReturn(Lists.newArrayList(provider).spliterator());
+      Mockito.doAnswer(
+              invocation -> {
+                Consumer<JobTemplateProvider> consumer = 
invocation.getArgument(0);
+                consumer.accept(provider);
+                return null;
+              })
+          .when(serviceLoader)
+          .forEach(any());
+
+      Map<String, JobTemplate> templates = listener.loadBuiltInJobTemplates();
+
+      // Should only include template with valid version
+      Assertions.assertEquals(1, templates.size());
+      Assertions.assertTrue(templates.containsKey("builtin-valid"));
+    }
+  }
+
+  @Test
+  public void testReconcileBuiltInJobTemplatesNewRegistration() {
+    String metalakeName = "test_metalake";
+    Map<String, JobTemplate> builtInTemplates = new HashMap<>();
+
+    ShellJobTemplate newTemplate =
+        ShellJobTemplate.builder()
+            .withName("builtin-new")
+            .withComment("new template")
+            .withExecutable("/bin/echo")
+            .withCustomFields(Collections.singletonMap("version", "v1"))
+            .build();
+    builtInTemplates.put("builtin-new", newTemplate);
+
+    
when(jobManager.listJobTemplates(metalakeName)).thenReturn(Collections.emptyList());
+    doNothing().when(jobManager).registerJobTemplate(eq(metalakeName), any());
+
+    listener.reconcileBuiltInJobTemplates(metalakeName, builtInTemplates);
+
+    verify(jobManager, times(1)).registerJobTemplate(eq(metalakeName), any());
+  }
+
+  @Test
+  public void testReconcileBuiltInJobTemplatesUpdate() throws IOException {
+    String metalakeName = "test_metalake";
+    Map<String, JobTemplate> builtInTemplates = new HashMap<>();
+
+    ShellJobTemplate updatedTemplate =
+        ShellJobTemplate.builder()
+            .withName("builtin-existing")
+            .withComment("updated")
+            .withExecutable("/bin/echo")
+            .withCustomFields(Collections.singletonMap("version", "v2"))
+            .build();
+    builtInTemplates.put("builtin-existing", updatedTemplate);
+
+    // Create and store the existing entity in the real entityStore
+    JobTemplateEntity existingEntity = 
createJobTemplateEntity("builtin-existing", "v1");
+    entityStore.put(existingEntity, false);
+
+    when(jobManager.listJobTemplates(metalakeName))
+        .thenReturn(Collections.singletonList(existingEntity));
+
+    listener.reconcileBuiltInJobTemplates(metalakeName, builtInTemplates);
+
+    // Verify the entity was updated by loading it
+    JobTemplateEntity updated =
+        entityStore.get(
+            existingEntity.nameIdentifier(),
+            Entity.EntityType.JOB_TEMPLATE,
+            JobTemplateEntity.class);
+    Assertions.assertEquals("v2", 
updated.templateContent().customFields().get("version"));
+  }
+
+  @Test
+  public void testReconcileBuiltInJobTemplatesNoUpdateWhenVersionSame() throws 
IOException {
+    String metalakeName = "test_metalake";
+    Map<String, JobTemplate> builtInTemplates = new HashMap<>();
+
+    ShellJobTemplate template =
+        ShellJobTemplate.builder()
+            .withName("builtin-existing")
+            .withComment("same version")
+            .withExecutable("/bin/echo")
+            .withCustomFields(Collections.singletonMap("version", "v1"))
+            .build();
+    builtInTemplates.put("builtin-existing", template);
+
+    // Create and store the existing entity
+    JobTemplateEntity existingEntity = 
createJobTemplateEntity("builtin-existing", "v1");
+    entityStore.put(existingEntity, false);
+
+    when(jobManager.listJobTemplates(metalakeName))
+        .thenReturn(Collections.singletonList(existingEntity));
+
+    listener.reconcileBuiltInJobTemplates(metalakeName, builtInTemplates);
+
+    // Should not update or register
+    verify(jobManager, times(0)).registerJobTemplate(any(), any());
+
+    // Verify entity version is still v1
+    JobTemplateEntity result =
+        entityStore.get(
+            existingEntity.nameIdentifier(),
+            Entity.EntityType.JOB_TEMPLATE,
+            JobTemplateEntity.class);
+    Assertions.assertEquals("v1", 
result.templateContent().customFields().get("version"));
+  }
+
+  @Test
+  public void testReconcileBuiltInJobTemplatesDeleteObsolete() {
+    String metalakeName = "test_metalake";
+    Map<String, JobTemplate> builtInTemplates = new HashMap<>(); // Empty - no 
templates
+
+    JobTemplateEntity obsoleteEntity = 
createJobTemplateEntity("builtin-obsolete", "v1");
+
+    when(jobManager.listJobTemplates(metalakeName))
+        .thenReturn(Collections.singletonList(obsoleteEntity));
+    doReturn(true).when(jobManager).deleteJobTemplate(metalakeName, 
"builtin-obsolete");
+
+    listener.reconcileBuiltInJobTemplates(metalakeName, builtInTemplates);
+
+    // Should delete obsolete template
+    verify(jobManager, times(1)).deleteJobTemplate(metalakeName, 
"builtin-obsolete");
+  }
+
+  @Test
+  public void testReconcileHandlesRegistrationException() {
+    String metalakeName = "test_metalake";
+    Map<String, JobTemplate> builtInTemplates = new HashMap<>();
+
+    ShellJobTemplate template =
+        ShellJobTemplate.builder()
+            .withName("builtin-new")
+            .withComment("new")
+            .withExecutable("/bin/echo")
+            .withCustomFields(Collections.singletonMap("version", "v1"))
+            .build();
+    builtInTemplates.put("builtin-new", template);
+
+    
when(jobManager.listJobTemplates(metalakeName)).thenReturn(Collections.emptyList());
+    Mockito.doThrow(new JobTemplateAlreadyExistsException("Already exists"))
+        .when(jobManager)
+        .registerJobTemplate(eq(metalakeName), any());
+
+    // Should not throw exception
+    listener.reconcileBuiltInJobTemplates(metalakeName, builtInTemplates);
+  }
+
+  private void createMetalake(String name, boolean inUse) throws IOException {
+    Map<String, String> props = new HashMap<>();
+    props.put(PROPERTY_IN_USE, String.valueOf(inUse));
+
+    BaseMetalake metalake =
+        BaseMetalake.builder()
+            .withId(idGenerator.nextId())
+            .withName(name)
+            .withComment("test metalake")
+            .withProperties(props)
+            .withVersion(SchemaVersion.V_0_1)
+            .withAuditInfo(
+                
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    entityStore.put(metalake, false);
+  }
+
+  private JobTemplateEntity createJobTemplateEntity(String name, String 
version) {
+    ShellJobTemplate template =
+        ShellJobTemplate.builder()
+            .withName(name)
+            .withComment("test")
+            .withExecutable("/bin/echo")
+            .withCustomFields(Collections.singletonMap("version", version))
+            .build();
+
+    return JobTemplateEntity.builder()
+        .withId(1L)
+        .withName(name)
+        .withNamespace(NamespaceUtil.ofJobTemplate("test_metalake"))
+        
.withTemplateContent(JobTemplateEntity.TemplateContent.fromJobTemplate(template))
+        .withAuditInfo(
+            
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+        .build();
+  }
+}
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java 
b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
index 4e669bec79..48f15f2b67 100644
--- a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
@@ -576,6 +576,12 @@ public class TestJobManager {
             .build();
     when(entityStore.list(Namespace.empty(), BaseMetalake.class, 
Entity.EntityType.METALAKE))
         .thenReturn(ImmutableList.of(mockMetalake));
+
+    // Mock MetalakeManager.listInUseMetalakes to return the test metalake
+    mockedMetalake
+        .when(() -> MetalakeManager.listInUseMetalakes(entityStore))
+        .thenReturn(ImmutableList.of(metalake));
+
     when(jobManager.listJobs(metalake, 
Optional.empty())).thenReturn(ImmutableList.of(job));
 
     
when(jobExecutor.getJobStatus(job.jobExecutionId())).thenReturn(JobHandle.Status.QUEUED);
@@ -600,6 +606,11 @@ public class TestJobManager {
     when(entityStore.list(Namespace.empty(), BaseMetalake.class, 
Entity.EntityType.METALAKE))
         .thenReturn(ImmutableList.of(mockMetalake));
 
+    // Mock MetalakeManager.listInUseMetalakes to return the test metalake
+    mockedMetalake
+        .when(() -> MetalakeManager.listInUseMetalakes(entityStore))
+        .thenReturn(ImmutableList.of(metalake));
+
     when(jobManager.listJobs(metalake, 
Optional.empty())).thenReturn(ImmutableList.of(job));
     Assertions.assertDoesNotThrow(() -> jobManager.cleanUpStagingDirs());
     verify(entityStore, never()).delete(any(), any());
diff --git 
a/core/src/test/java/org/apache/gravitino/job/TestJobTemplateValidationDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/job/TestJobTemplateValidationDispatcher.java
new file mode 100644
index 0000000000..0f27df37cb
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/job/TestJobTemplateValidationDispatcher.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.IllegalJobTemplateOperationException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestJobTemplateValidationDispatcher {
+
+  private JobOperationDispatcher mockDispatcher;
+  private JobTemplateValidationDispatcher validationDispatcher;
+
+  @BeforeEach
+  public void setUp() {
+    mockDispatcher = mock(JobOperationDispatcher.class);
+    validationDispatcher = new JobTemplateValidationDispatcher(mockDispatcher);
+  }
+
+  @Test
+  public void testIsBuiltInTemplateName() {
+    // Test built-in template names
+    
assertTrue(JobTemplateValidationDispatcher.isBuiltInTemplateName("builtin-spark-pi"));
+    
assertTrue(JobTemplateValidationDispatcher.isBuiltInTemplateName("builtin-test"));
+    
assertTrue(JobTemplateValidationDispatcher.isBuiltInTemplateName("builtin-"));
+
+    // Test non-built-in template names
+    
assertFalse(JobTemplateValidationDispatcher.isBuiltInTemplateName("my-template"));
+    
assertFalse(JobTemplateValidationDispatcher.isBuiltInTemplateName("user-template"));
+    
assertFalse(JobTemplateValidationDispatcher.isBuiltInTemplateName("BUILTIN-test"));
+    assertFalse(JobTemplateValidationDispatcher.isBuiltInTemplateName(""));
+    assertFalse(JobTemplateValidationDispatcher.isBuiltInTemplateName(null));
+  }
+
+  @Test
+  public void testRegisterJobTemplateWithBuiltInName() {
+    JobTemplateEntity entity = 
createJobTemplateEntity("builtin-test-template");
+
+    IllegalJobTemplateOperationException exception =
+        assertThrows(
+            IllegalJobTemplateOperationException.class,
+            () -> validationDispatcher.registerJobTemplate("metalake1", 
entity));
+
+    assertTrue(exception.getMessage().contains("builtin-test-template"));
+    assertTrue(exception.getMessage().contains("reserved for built-in 
templates"));
+    verify(mockDispatcher, never()).registerJobTemplate(anyString(), any());
+  }
+
+  @Test
+  public void testRegisterJobTemplateWithUserName() throws 
JobTemplateAlreadyExistsException {
+    JobTemplateEntity entity = createJobTemplateEntity("my-custom-template");
+
+    doNothing().when(mockDispatcher).registerJobTemplate(eq("metalake1"), 
eq(entity));
+
+    assertDoesNotThrow(() -> 
validationDispatcher.registerJobTemplate("metalake1", entity));
+
+    verify(mockDispatcher).registerJobTemplate("metalake1", entity);
+  }
+
+  @Test
+  public void testDeleteBuiltInJobTemplate() {
+    IllegalJobTemplateOperationException exception =
+        assertThrows(
+            IllegalJobTemplateOperationException.class,
+            () -> validationDispatcher.deleteJobTemplate("metalake1", 
"builtin-spark-pi"));
+
+    assertTrue(exception.getMessage().contains("Cannot delete"));
+    assertTrue(exception.getMessage().contains("builtin-spark-pi"));
+    assertTrue(exception.getMessage().contains("managed by the system"));
+    verify(mockDispatcher, never()).deleteJobTemplate(anyString(), 
anyString());
+  }
+
+  @Test
+  public void testDeleteUserJobTemplate() {
+    when(mockDispatcher.deleteJobTemplate("metalake1", 
"my-template")).thenReturn(true);
+
+    boolean result = validationDispatcher.deleteJobTemplate("metalake1", 
"my-template");
+
+    assertTrue(result);
+    verify(mockDispatcher).deleteJobTemplate("metalake1", "my-template");
+  }
+
+  @Test
+  public void testAlterBuiltInJobTemplate() {
+    JobTemplateChange change = JobTemplateChange.updateComment("new comment");
+
+    IllegalJobTemplateOperationException exception =
+        assertThrows(
+            IllegalJobTemplateOperationException.class,
+            () -> validationDispatcher.alterJobTemplate("metalake1", 
"builtin-spark-pi", change));
+
+    assertTrue(exception.getMessage().contains("Cannot alter"));
+    assertTrue(exception.getMessage().contains("builtin-spark-pi"));
+    verify(mockDispatcher, never())
+        .alterJobTemplate(anyString(), anyString(), 
any(JobTemplateChange[].class));
+  }
+
+  @Test
+  public void testAlterUserJobTemplate() {
+    JobTemplateEntity entity = createJobTemplateEntity("my-template");
+    JobTemplateChange change = JobTemplateChange.updateComment("new comment");
+
+    when(mockDispatcher.alterJobTemplate("metalake1", "my-template", 
change)).thenReturn(entity);
+
+    JobTemplateEntity result =
+        validationDispatcher.alterJobTemplate("metalake1", "my-template", 
change);
+
+    assertEquals(entity, result);
+    verify(mockDispatcher).alterJobTemplate("metalake1", "my-template", 
change);
+  }
+
+  @Test
+  public void testRenameToBuiltInName() {
+    JobTemplateChange change = JobTemplateChange.rename("builtin-new-name");
+
+    IllegalJobTemplateOperationException exception =
+        assertThrows(
+            IllegalJobTemplateOperationException.class,
+            () -> validationDispatcher.alterJobTemplate("metalake1", 
"my-template", change));
+
+    assertTrue(exception.getMessage().contains("builtin-new-name"));
+    assertTrue(exception.getMessage().contains("reserved for built-in 
templates"));
+    verify(mockDispatcher, never())
+        .alterJobTemplate(anyString(), anyString(), 
any(JobTemplateChange[].class));
+  }
+
+  @Test
+  public void testRenameToUserName() {
+    JobTemplateEntity entity = createJobTemplateEntity("new-user-name");
+    JobTemplateChange change = JobTemplateChange.rename("new-user-name");
+
+    when(mockDispatcher.alterJobTemplate("metalake1", "old-name", 
change)).thenReturn(entity);
+
+    JobTemplateEntity result =
+        validationDispatcher.alterJobTemplate("metalake1", "old-name", change);
+
+    assertEquals(entity, result);
+    verify(mockDispatcher).alterJobTemplate("metalake1", "old-name", change);
+  }
+
+  @Test
+  public void testMultipleChangesWithBuiltInRename() {
+    JobTemplateChange change1 = JobTemplateChange.updateComment("new comment");
+    JobTemplateChange change2 = JobTemplateChange.rename("builtin-new-name");
+
+    IllegalJobTemplateOperationException exception =
+        assertThrows(
+            IllegalJobTemplateOperationException.class,
+            () ->
+                validationDispatcher.alterJobTemplate(
+                    "metalake1", "my-template", change1, change2));
+
+    assertTrue(exception.getMessage().contains("builtin-new-name"));
+    verify(mockDispatcher, never())
+        .alterJobTemplate(anyString(), anyString(), 
any(JobTemplateChange[].class));
+  }
+
+  @Test
+  public void testListJobTemplates() {
+    validationDispatcher.listJobTemplates("metalake1");
+    verify(mockDispatcher).listJobTemplates("metalake1");
+  }
+
+  @Test
+  public void testGetJobTemplate() {
+    validationDispatcher.getJobTemplate("metalake1", "any-template");
+    verify(mockDispatcher).getJobTemplate("metalake1", "any-template");
+  }
+
+  @Test
+  public void testListJobs() {
+    validationDispatcher.listJobs("metalake1", Optional.empty());
+    verify(mockDispatcher).listJobs("metalake1", Optional.empty());
+  }
+
+  @Test
+  public void testGetJob() {
+    validationDispatcher.getJob("metalake1", "job-123");
+    verify(mockDispatcher).getJob("metalake1", "job-123");
+  }
+
+  @Test
+  public void testRunJob() {
+    validationDispatcher.runJob("metalake1", "my-template", 
Collections.emptyMap());
+    verify(mockDispatcher).runJob("metalake1", "my-template", 
Collections.emptyMap());
+  }
+
+  @Test
+  public void testCancelJob() {
+    validationDispatcher.cancelJob("metalake1", "job-123");
+    verify(mockDispatcher).cancelJob("metalake1", "job-123");
+  }
+
+  private JobTemplateEntity createJobTemplateEntity(String name) {
+    return JobTemplateEntity.builder()
+        .withId(1L)
+        .withName(name)
+        .withNamespace(Namespace.of("metalake1"))
+        .withComment("test template")
+        .withTemplateContent(
+            JobTemplateEntity.TemplateContent.builder()
+                .withJobType(JobTemplate.JobType.SPARK)
+                .withExecutable("spark-submit")
+                .withArguments(Collections.emptyList())
+                .withEnvironments(Collections.emptyMap())
+                .withCustomFields(Collections.emptyMap())
+                .withClassName("com.example.Main")
+                .withJars(Collections.emptyList())
+                .withFiles(Collections.emptyList())
+                .withArchives(Collections.emptyList())
+                .withConfigs(Collections.emptyMap())
+                .build())
+        .withAuditInfo(
+            
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+        .build();
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java 
b/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java
index 03624f28da..4a91539d4f 100644
--- a/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java
+++ b/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.doReturn;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.gravitino.Config;
@@ -209,6 +210,35 @@ public class TestMetalakeManager {
     Assertions.assertFalse(dropped1, "metalake should be non-existent");
   }
 
+  @Test
+  public void testListInUseMetalakes() {
+    // Create some metalakes with different in-use status
+    NameIdentifier ident1 = NameIdentifier.of("metalake1");
+    NameIdentifier ident2 = NameIdentifier.of("metalake2");
+    NameIdentifier ident3 = NameIdentifier.of("metalake3");
+
+    // Create metalakes - by default they should be in-use (PROPERTY_IN_USE 
defaults to true)
+    metalakeManager.createMetalake(ident1, "comment1", ImmutableMap.of());
+    metalakeManager.createMetalake(ident2, "comment2", ImmutableMap.of());
+    metalakeManager.createMetalake(ident3, "comment3", ImmutableMap.of());
+
+    // Disable metalake2
+    metalakeManager.disableMetalake(ident2);
+
+    // List in-use metalakes
+    List<String> inUseMetalakes = 
MetalakeManager.listInUseMetalakes(entityStore);
+
+    // Should contain metalake1 and metalake3, but not metalake2
+    Assertions.assertTrue(inUseMetalakes.contains("metalake1"));
+    Assertions.assertFalse(inUseMetalakes.contains("metalake2"));
+    Assertions.assertTrue(inUseMetalakes.contains("metalake3"));
+
+    // Cleanup
+    metalakeManager.dropMetalake(ident1, true);
+    metalakeManager.dropMetalake(ident2, true);
+    metalakeManager.dropMetalake(ident3, true);
+  }
+
   private void testProperties(Map<String, String> expectedProps, Map<String, 
String> testProps) {
     expectedProps.forEach(
         (k, v) -> {
diff --git a/maintenance/jobs/build.gradle.kts 
b/maintenance/jobs/build.gradle.kts
new file mode 100644
index 0000000000..9a0266f7d7
--- /dev/null
+++ b/maintenance/jobs/build.gradle.kts
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
+
+plugins {
+  `maven-publish`
+  id("java")
+  alias(libs.plugins.shadow)
+}
+
+repositories {
+  mavenCentral()
+}
+
+val scalaVersion: String = project.properties["scalaVersion"] as? String ?: 
extra["defaultScalaVersion"].toString()
+val sparkVersion: String = libs.versions.spark35.get()
+
+dependencies {
+  compileOnly(project(":api"))
+
+  compileOnly(libs.slf4j.api)
+  compileOnly("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") {
+    exclude("org.slf4j")
+    exclude("org.apache.logging.log4j")
+  }
+
+  testImplementation(project(":api"))
+  testImplementation(libs.bundles.log4j)
+  testImplementation(libs.junit.jupiter.api)
+  testRuntimeOnly(libs.junit.jupiter.engine)
+}
+
+tasks.test {
+  useJUnitPlatform()
+}
+
+tasks.withType(ShadowJar::class.java) {
+  isZip64 = true
+  archiveClassifier.set("")
+  mergeServiceFiles()
+
+  dependencies {
+    exclude(dependency("org.apache.spark:.*"))
+    exclude(dependency("org.slf4j:slf4j-api"))
+    exclude(dependency("org.apache.logging.log4j:.*"))
+  }
+}
+
+tasks.jar {
+  dependsOn(tasks.named("shadowJar"))
+  archiveClassifier.set("empty")
+}
diff --git 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJob.java
 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJob.java
new file mode 100644
index 0000000000..9f8bb59956
--- /dev/null
+++ 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJob.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs;
+
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.CodeSource;
+import org.apache.gravitino.job.JobTemplate;
+
+/** Contract for built-in jobs to expose their job template definitions. */
+public interface BuiltInJob {
+
+  /** Returns the built-in job template provided by this job. */
+  JobTemplate jobTemplate();
+
+  /** Resolve the executable jar that hosts the built-in jobs. */
+  default String resolveExecutable(Class<?> cls) {
+    try {
+      CodeSource codeSource = cls.getProtectionDomain().getCodeSource();
+      if (codeSource == null || codeSource.getLocation() == null) {
+        throw new RuntimeException("Failed to resolve built-in jobs jar 
location");
+      }
+
+      Path path = Paths.get(codeSource.getLocation().toURI());
+      return path.toAbsolutePath().toString();
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("Failed to resolve built-in jobs jar 
location", e);
+    }
+  }
+}
diff --git 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java
 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java
new file mode 100644
index 0000000000..3758fe8d26
--- /dev/null
+++ 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.maintenance.jobs.spark.SparkPiJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Provides built-in job templates bundled with the Gravitino jobs module. */
+public class BuiltInJobTemplateProvider implements JobTemplateProvider {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BuiltInJobTemplateProvider.class);
+
+  private static final Pattern NAME_PATTERN =
+      Pattern.compile(JobTemplateProvider.BUILTIN_NAME_PATTERN);
+  private static final Pattern VERSION_PATTERN =
+      Pattern.compile(JobTemplateProvider.VERSION_VALUE_PATTERN);
+
+  private static final List<BuiltInJob> BUILT_IN_JOBS = ImmutableList.of(new 
SparkPiJob());
+
+  @Override
+  public List<? extends JobTemplate> jobTemplates() {
+    return BUILT_IN_JOBS.stream()
+        .map(BuiltInJob::jobTemplate)
+        .filter(this::isValid)
+        .collect(Collectors.toList());
+  }
+
+  private boolean isValid(JobTemplate template) {
+    if (!NAME_PATTERN.matcher(template.name()).matches()) {
+      LOG.warn("Skip built-in job template with illegal name: {}", 
template.name());
+      return false;
+    }
+
+    Optional<String> version =
+        Optional.ofNullable(template.customFields())
+            .map(fields -> 
fields.get(JobTemplateProvider.PROPERTY_VERSION_KEY));
+    if (version.isEmpty() || 
!VERSION_PATTERN.matcher(version.get()).matches()) {
+      LOG.warn("Skip built-in job template {} without valid version", 
template.name());
+      return false;
+    }
+
+    return true;
+  }
+}
diff --git 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/spark/SparkPiJob.java
 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/spark/SparkPiJob.java
new file mode 100644
index 0000000000..6ff35181ea
--- /dev/null
+++ 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/spark/SparkPiJob.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.spark;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.maintenance.jobs.BuiltInJob;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Self-contained Spark Pi program for the built-in SparkPi job template.
+ *
+ * <p>This avoids depending on the Spark examples jar so the built-in template 
can run with only the
+ * Gravitino-provided jobs artifact on the classpath.
+ */
+public class SparkPiJob implements BuiltInJob {
+
+  private static final String NAME = JobTemplateProvider.BUILTIN_NAME_PREFIX + 
"sparkpi";
+  // Bump VERSION whenever SparkPi template behavior changes 
(name/executable/class/args/configs).
+  private static final String VERSION = "v1";
+
+  @Override
+  public SparkJobTemplate jobTemplate() {
+    return SparkJobTemplate.builder()
+        .withName(NAME)
+        .withComment("Built-in SparkPi job template")
+        .withExecutable(resolveExecutable(SparkPiJob.class))
+        .withClassName(SparkPiJob.class.getName())
+        .withArguments(Collections.singletonList("{{slices}}"))
+        .withConfigs(buildSparkConfigs())
+        .withCustomFields(
+            Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, 
VERSION))
+        .build();
+  }
+
+  public static void main(String[] args) {
+    int slices = 2;
+    if (args.length > 0) {
+      try {
+        slices = Integer.parseInt(args[0]);
+      } catch (NumberFormatException e) {
+        System.err.println("Invalid number of slices provided. Using default 
value of 2.");
+      }
+    }
+
+    int samples = Math.max(slices, 1) * 100000;
+
+    SparkSession spark =
+        SparkSession.builder()
+            .appName("Gravitino Built-in SparkPi")
+            // Rely on external cluster/master configuration
+            .getOrCreate();
+
+    JavaSparkContext jsc = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
+    JavaRDD<Integer> rdd =
+        jsc.parallelize(IntStream.range(0, 
samples).boxed().collect(Collectors.toList()), slices);
+    long count =
+        rdd.filter(
+                i -> {
+                  double x = Math.random() * 2 - 1;
+                  double y = Math.random() * 2 - 1;
+                  return x * x + y * y <= 1;
+                })
+            .count();
+
+    double pi = 4.0 * count / samples;
+    System.out.printf("Pi is roughly %.5f%n", pi);
+
+    spark.stop();
+  }
+
+  private Map<String, String> buildSparkConfigs() {
+    Map<String, String> configs = new HashMap<>();
+    configs.put("spark.master", "{{spark_master}}");
+    configs.put("spark.executor.instances", "{{spark_executor_instances}}");
+    configs.put("spark.executor.cores", "{{spark_executor_cores}}");
+    configs.put("spark.executor.memory", "{{spark_executor_memory}}");
+    configs.put("spark.driver.memory", "{{spark_driver_memory}}");
+    return Collections.unmodifiableMap(configs);
+  }
+}
diff --git 
a/maintenance/jobs/src/main/resources/META-INF/services/org.apache.gravitino.job.JobTemplateProvider
 
b/maintenance/jobs/src/main/resources/META-INF/services/org.apache.gravitino.job.JobTemplateProvider
new file mode 100644
index 0000000000..7b812fb9cd
--- /dev/null
+++ 
b/maintenance/jobs/src/main/resources/META-INF/services/org.apache.gravitino.job.JobTemplateProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.gravitino.maintenance.jobs.BuiltInJobTemplateProvider
diff --git 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/TestBuiltInJob.java
 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/TestBuiltInJob.java
new file mode 100644
index 0000000000..1660698b06
--- /dev/null
+++ 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/TestBuiltInJob.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.gravitino.job.JobTemplate;
+import org.junit.jupiter.api.Test;
+
+public class TestBuiltInJob {
+
+  private static class TestBuiltInJobImpl implements BuiltInJob {
+    @Override
+    public JobTemplate jobTemplate() {
+      return null; // Not needed for this test
+    }
+  }
+
+  @Test
+  public void testResolveExecutableReturnsValidPath() {
+    BuiltInJob job = new TestBuiltInJobImpl();
+    String executable = job.resolveExecutable(TestBuiltInJobImpl.class);
+
+    assertNotNull(executable, "Executable path should not be null");
+    assertFalse(executable.trim().isEmpty(), "Executable path should not be 
empty");
+  }
+
+  @Test
+  public void testResolveExecutablePointsToJarOrClassDirectory() {
+    BuiltInJob job = new TestBuiltInJobImpl();
+    String executable = job.resolveExecutable(TestBuiltInJobImpl.class);
+
+    // Should end with .jar or be a directory path containing test classes
+    assertTrue(
+        executable.endsWith(".jar")
+            || executable.contains("test-classes")
+            || executable.contains("classes"),
+        "Executable should point to a jar file or classes directory: " + 
executable);
+  }
+}
diff --git 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/TestBuiltInJobTemplateProvider.java
 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/TestBuiltInJobTemplateProvider.java
new file mode 100644
index 0000000000..f2cf0f6937
--- /dev/null
+++ 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/TestBuiltInJobTemplateProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.junit.jupiter.api.Test;
+
+public class TestBuiltInJobTemplateProvider {
+
+  @Test
+  public void testJobTemplatesReturnsNonEmptyList() {
+    BuiltInJobTemplateProvider provider = new BuiltInJobTemplateProvider();
+    List<? extends JobTemplate> templates = provider.jobTemplates();
+
+    assertNotNull(templates, "Job templates list should not be null");
+    assertFalse(templates.isEmpty(), "Should provide at least one built-in job 
template");
+  }
+
+  @Test
+  public void testAllTemplatesHaveBuiltInPrefix() {
+    BuiltInJobTemplateProvider provider = new BuiltInJobTemplateProvider();
+    List<? extends JobTemplate> templates = provider.jobTemplates();
+
+    for (JobTemplate template : templates) {
+      assertTrue(
+          template.name().startsWith(JobTemplateProvider.BUILTIN_NAME_PREFIX),
+          "Template name should start with builtin- prefix: " + 
template.name());
+    }
+  }
+
+  @Test
+  public void testAllTemplatesHaveValidVersion() {
+    BuiltInJobTemplateProvider provider = new BuiltInJobTemplateProvider();
+    List<? extends JobTemplate> templates = provider.jobTemplates();
+
+    for (JobTemplate template : templates) {
+      String version = 
template.customFields().get(JobTemplateProvider.PROPERTY_VERSION_KEY);
+      assertNotNull(version, "Template should have version property: " + 
template.name());
+      assertTrue(
+          version.matches(JobTemplateProvider.VERSION_VALUE_PATTERN),
+          "Version should match pattern v\\d+: " + version);
+    }
+  }
+}
diff --git 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/spark/TestSparkPiJob.java
 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/spark/TestSparkPiJob.java
new file mode 100644
index 0000000000..e2afbd2115
--- /dev/null
+++ 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/spark/TestSparkPiJob.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.spark;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.junit.jupiter.api.Test;
+
+public class TestSparkPiJob {
+
+  @Test
+  public void testJobTemplateHasCorrectName() {
+    SparkPiJob job = new SparkPiJob();
+    SparkJobTemplate template = job.jobTemplate();
+
+    assertNotNull(template);
+    assertEquals("builtin-sparkpi", template.name());
+  }
+
+  @Test
+  public void testJobTemplateHasComment() {
+    SparkPiJob job = new SparkPiJob();
+    SparkJobTemplate template = job.jobTemplate();
+
+    assertNotNull(template.comment());
+    assertFalse(template.comment().trim().isEmpty());
+  }
+
+  @Test
+  public void testJobTemplateHasExecutable() {
+    SparkPiJob job = new SparkPiJob();
+    SparkJobTemplate template = job.jobTemplate();
+
+    assertNotNull(template.executable());
+    assertFalse(template.executable().trim().isEmpty());
+  }
+
+  @Test
+  public void testJobTemplateHasMainClass() {
+    SparkPiJob job = new SparkPiJob();
+    SparkJobTemplate template = job.jobTemplate();
+
+    assertNotNull(template.className());
+    assertEquals(SparkPiJob.class.getName(), template.className());
+  }
+
+  @Test
+  public void testJobTemplateHasArguments() {
+    SparkPiJob job = new SparkPiJob();
+    SparkJobTemplate template = job.jobTemplate();
+
+    assertNotNull(template.arguments());
+    assertEquals(1, template.arguments().size());
+    assertEquals("{{slices}}", template.arguments().get(0));
+  }
+
+  @Test
+  public void testJobTemplateHasSparkConfigs() {
+    SparkPiJob job = new SparkPiJob();
+    SparkJobTemplate template = job.jobTemplate();
+
+    Map<String, String> configs = template.configs();
+    assertNotNull(configs);
+    assertFalse(configs.isEmpty());
+
+    // Verify expected config keys with placeholders
+    assertTrue(configs.containsKey("spark.master"));
+    assertTrue(configs.containsKey("spark.executor.instances"));
+    assertTrue(configs.containsKey("spark.executor.cores"));
+    assertTrue(configs.containsKey("spark.executor.memory"));
+    assertTrue(configs.containsKey("spark.driver.memory"));
+
+    // Verify placeholders
+    assertEquals("{{spark_master}}", configs.get("spark.master"));
+    assertEquals("{{spark_executor_instances}}", 
configs.get("spark.executor.instances"));
+    assertEquals("{{spark_executor_cores}}", 
configs.get("spark.executor.cores"));
+    assertEquals("{{spark_executor_memory}}", 
configs.get("spark.executor.memory"));
+    assertEquals("{{spark_driver_memory}}", 
configs.get("spark.driver.memory"));
+  }
+
+  @Test
+  public void testJobTemplateHasVersion() {
+    SparkPiJob job = new SparkPiJob();
+    SparkJobTemplate template = job.jobTemplate();
+
+    Map<String, String> customFields = template.customFields();
+    assertNotNull(customFields);
+    
assertTrue(customFields.containsKey(JobTemplateProvider.PROPERTY_VERSION_KEY));
+
+    String version = 
customFields.get(JobTemplateProvider.PROPERTY_VERSION_KEY);
+    assertEquals("v1", version);
+    assertTrue(version.matches(JobTemplateProvider.VERSION_VALUE_PATTERN));
+  }
+
+  @Test
+  public void testJobTemplateNameMatchesBuiltInPattern() {
+    SparkPiJob job = new SparkPiJob();
+    SparkJobTemplate template = job.jobTemplate();
+
+    
assertTrue(template.name().matches(JobTemplateProvider.BUILTIN_NAME_PATTERN));
+    
assertTrue(template.name().startsWith(JobTemplateProvider.BUILTIN_NAME_PREFIX));
+  }
+
+  @Test
+  public void testResolveExecutableReturnsNonEmptyPath() {
+    SparkPiJob job = new SparkPiJob();
+    String executable = job.jobTemplate().executable();
+
+    assertNotNull(executable);
+    assertFalse(executable.trim().isEmpty());
+  }
+}
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
index d3ddc1087f..a4476d6a7b 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
@@ -28,6 +28,7 @@ import 
org.apache.gravitino.exceptions.ConnectionFailedException;
 import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
 import org.apache.gravitino.exceptions.ForbiddenException;
 import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
+import org.apache.gravitino.exceptions.IllegalJobTemplateOperationException;
 import org.apache.gravitino.exceptions.InUseException;
 import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
 import org.apache.gravitino.exceptions.MetalakeAlreadyExistsException;
@@ -859,7 +860,10 @@ public class ExceptionHandlers {
       String errorMsg = getJobTemplateErrorMsg(formatted, op.name(), parent, 
getErrorMsg(e));
       LOG.warn(errorMsg, e);
 
-      if (e instanceof IllegalArgumentException) {
+      if (e instanceof IllegalJobTemplateOperationException) {
+        return Utils.illegalArguments(errorMsg, e);
+
+      } else if (e instanceof IllegalArgumentException) {
         return Utils.illegalArguments(errorMsg, e);
 
       } else if (e instanceof NotFoundException) {
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 1be14c8e1d..75dab2bc0c 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -88,4 +88,4 @@ include(":bundles:azure", ":bundles:azure-bundle", 
":bundles:iceberg-azure-bundl
 include(":catalogs:hadoop-common")
 include(":lineage")
 include(":mcp-server")
-include(":maintenance:optimizer")
+include(":maintenance:optimizer", ":maintenance:jobs")

Reply via email to