This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new fc757d987e [#9541] feat(core): Add built-in job template framework in
Gravitino (#9542)
fc757d987e is described below
commit fc757d987e79c1fa7285095066cf53f6a2fdad7a
Author: Jerry Shao <[email protected]>
AuthorDate: Wed Dec 31 15:39:43 2025 +0800
[#9541] feat(core): Add built-in job template framework in Gravitino (#9542)
### What changes were proposed in this pull request?
This PR add a built-in job template support in Gravitino's job system.
### Why are the changes needed?
Fix: #9541
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UTs.
---
.../IllegalJobTemplateOperationException.java | 54 +++
.../apache/gravitino/job/JobTemplateProvider.java | 49 ++
build.gradle.kts | 30 +-
.../org/apache/gravitino/client/ErrorHandlers.java | 9 +-
.../gravitino/client/integration/test/JobIT.java | 7 -
.../test/authorization/JobAuthorizationIT.java | 7 +-
clients/client-python/gravitino/exceptions/base.py | 8 +
.../exceptions/handlers/job_error_handler.py | 5 +
.../tests/integration/test_supports_jobs.py | 7 -
.../org/apache/gravitino/json/TestSerializer.java | 42 +-
.../java/org/apache/gravitino/GravitinoEnv.java | 14 +-
.../job/BuiltInJobTemplateEventListener.java | 395 ++++++++++++++++
.../java/org/apache/gravitino/job/JobManager.java | 25 +-
.../job/JobTemplateValidationDispatcher.java | 182 +++++++
.../apache/gravitino/metalake/MetalakeManager.java | 25 +
.../job/TestBuiltInJobTemplateEventListener.java | 525 +++++++++++++++++++++
.../org/apache/gravitino/job/TestJobManager.java | 11 +
.../job/TestJobTemplateValidationDispatcher.java | 254 ++++++++++
.../gravitino/metalake/TestMetalakeManager.java | 30 ++
maintenance/jobs/build.gradle.kts | 69 +++
.../gravitino/maintenance/jobs/BuiltInJob.java | 47 ++
.../jobs/BuiltInJobTemplateProvider.java | 68 +++
.../maintenance/jobs/spark/SparkPiJob.java | 104 ++++
.../org.apache.gravitino.job.JobTemplateProvider | 19 +
.../gravitino/maintenance/jobs/TestBuiltInJob.java | 58 +++
.../jobs/TestBuiltInJobTemplateProvider.java | 66 +++
.../maintenance/jobs/spark/TestSparkPiJob.java | 134 ++++++
.../server/web/rest/ExceptionHandlers.java | 6 +-
settings.gradle.kts | 2 +-
29 files changed, 2178 insertions(+), 74 deletions(-)
diff --git
a/api/src/main/java/org/apache/gravitino/exceptions/IllegalJobTemplateOperationException.java
b/api/src/main/java/org/apache/gravitino/exceptions/IllegalJobTemplateOperationException.java
new file mode 100644
index 0000000000..764c8ef722
--- /dev/null
+++
b/api/src/main/java/org/apache/gravitino/exceptions/IllegalJobTemplateOperationException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.exceptions;
+
+import com.google.errorprone.annotations.FormatMethod;
+import com.google.errorprone.annotations.FormatString;
+
+/**
+ * Exception thrown when an illegal operation is attempted on a job template,
such as trying to
+ * modify or delete a built-in job template, or creating a user template with
a reserved name.
+ */
+public class IllegalJobTemplateOperationException extends
IllegalArgumentException {
+
+ /**
+ * Constructs a new IllegalJobTemplateOperationException with the specified
detail message.
+ *
+ * @param message the detail message
+ * @param args the arguments to the message
+ */
+ @FormatMethod
+ public IllegalJobTemplateOperationException(@FormatString String message,
Object... args) {
+ super(String.format(message, args));
+ }
+
+ /**
+ * Constructs a new IllegalJobTemplateOperationException with the specified
detail message and
+ * cause.
+ *
+ * @param cause the cause
+ * @param message the detail message
+ * @param args the arguments to the message
+ */
+ @FormatMethod
+ public IllegalJobTemplateOperationException(
+ Throwable cause, @FormatString String message, Object... args) {
+ super(String.format(message, args), cause);
+ }
+}
diff --git
a/api/src/main/java/org/apache/gravitino/job/JobTemplateProvider.java
b/api/src/main/java/org/apache/gravitino/job/JobTemplateProvider.java
new file mode 100644
index 0000000000..2dfee7ed1a
--- /dev/null
+++ b/api/src/main/java/org/apache/gravitino/job/JobTemplateProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job;
+
+import java.util.List;
+
+/**
+ * JobTemplateProvider exposes job templates that can be registered into
Gravitino.
+ *
+ * <p>Implementations are expected to be discoverable (for example via SPI) so
that Gravitino can
+ * onboard built-in job templates automatically.
+ */
+public interface JobTemplateProvider {
+
+ /** Prefix for built-in job template names. */
+ String BUILTIN_NAME_PREFIX = "builtin-";
+
+ /** Regex to validate built-in job template names. */
+ String BUILTIN_NAME_PATTERN = "^" + BUILTIN_NAME_PREFIX + "[\\w-]+$";
+
+ /** Property key used to carry the built-in job template version. */
+ String PROPERTY_VERSION_KEY = "version";
+
+ /** Regex for version property value (e.g., v1, v2). */
+ String VERSION_VALUE_PATTERN = "v\\d+";
+
+ /**
+ * Return all job templates provided by this implementation.
+ *
+ * @return a list of job templates to register
+ */
+ List<? extends JobTemplate> jobTemplates();
+}
diff --git a/build.gradle.kts b/build.gradle.kts
index aaa743b7ac..eb8b45ade9 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -314,14 +314,22 @@ subprojects {
}
fun compatibleWithJDK8(project: Project): Boolean {
+ val name = project.name.lowercase()
+ val path = project.path.lowercase()
+ if (path.startsWith(":maintenance:jobs") ||
+ name == "api" ||
+ name == "common" ||
+ name == "catalog-common" ||
+ name == "hadoop-common"
+ ) {
+ return true
+ }
+
val isReleaseRun = gradle.startParameter.taskNames.any { it == "release"
|| it == "publish" || it == "publishToMavenLocal" }
if (!isReleaseRun) {
return false
}
- val name = project.name.lowercase()
- val path = project.path.lowercase()
-
if (path.startsWith(":client") ||
path.startsWith(":spark-connector") ||
path.startsWith(":flink-connector") ||
@@ -330,12 +338,6 @@ subprojects {
return true
}
- if (name == "api" || name == "common" ||
- name == "catalog-common" || name == "hadoop-common"
- ) {
- return true
- }
-
return false
}
extensions.extraProperties.set("excludePackagesForSparkConnector",
::excludePackagesForSparkConnector)
@@ -713,6 +715,7 @@ tasks {
"copySubprojectDependencies",
"copySubprojectLib",
"copyCliLib",
+ "copyJobsLib",
":authorizations:copyLibAndConfig",
":iceberg:iceberg-rest-server:copyLibAndConfigs",
":lance:lance-rest-server:copyLibAndConfigs",
@@ -1015,6 +1018,15 @@ tasks {
setDuplicatesStrategy(DuplicatesStrategy.EXCLUDE)
}
+ register("copyJobsLib", Copy::class) {
+ dependsOn(":maintenance:jobs:build")
+ from("maintenance/jobs/build/libs")
+ into("distribution/package/auxlib")
+ include("gravitino-jobs-*.jar")
+ exclude("*-empty.jar")
+ setDuplicatesStrategy(DuplicatesStrategy.EXCLUDE)
+ }
+
register("copySubprojectLib", Copy::class) {
subprojects.forEach() {
if (!it.name.startsWith("authorization") &&
diff --git
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
index 705731808f..62806e106f 100644
---
a/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
+++
b/clients/client-java/src/main/java/org/apache/gravitino/client/ErrorHandlers.java
@@ -34,6 +34,7 @@ import
org.apache.gravitino.exceptions.ConnectionFailedException;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.ForbiddenException;
import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
+import org.apache.gravitino.exceptions.IllegalJobTemplateOperationException;
import org.apache.gravitino.exceptions.IllegalMetadataObjectException;
import org.apache.gravitino.exceptions.IllegalPrivilegeException;
import org.apache.gravitino.exceptions.IllegalRoleException;
@@ -1165,7 +1166,13 @@ public class ErrorHandlers {
switch (errorResponse.getCode()) {
case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
- throw new IllegalArgumentException(errorMsg);
+ if (errorResponse
+ .getType()
+
.equals(IllegalJobTemplateOperationException.class.getSimpleName())) {
+ throw new IllegalJobTemplateOperationException(errorMsg);
+ } else {
+ throw new IllegalArgumentException(errorMsg);
+ }
case ErrorConstants.NOT_FOUND_CODE:
if
(errorResponse.getType().equals(NoSuchMetalakeException.class.getSimpleName()))
{
diff --git
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
index f5e66b304d..9a0096513f 100644
---
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
+++
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/JobIT.java
@@ -110,7 +110,6 @@ public class JobIT extends BaseIT {
Assertions.assertDoesNotThrow(() ->
metalake.registerJobTemplate(template2));
List<JobTemplate> registeredTemplates = metalake.listJobTemplates();
- Assertions.assertEquals(2, registeredTemplates.size());
Assertions.assertTrue(registeredTemplates.contains(template1));
Assertions.assertTrue(registeredTemplates.contains(template2));
@@ -140,7 +139,6 @@ public class JobIT extends BaseIT {
Assertions.assertDoesNotThrow(() ->
metalake.registerJobTemplate(template2));
List<JobTemplate> registeredTemplates = metalake.listJobTemplates();
- Assertions.assertEquals(2, registeredTemplates.size());
Assertions.assertTrue(registeredTemplates.contains(template1));
Assertions.assertTrue(registeredTemplates.contains(template2));
@@ -160,7 +158,6 @@ public class JobIT extends BaseIT {
// Verify the list of job templates after deletion
registeredTemplates = metalake.listJobTemplates();
- Assertions.assertEquals(1, registeredTemplates.size());
Assertions.assertTrue(registeredTemplates.contains(template2));
// Test deleting a non-existent job template
@@ -172,10 +169,6 @@ public class JobIT extends BaseIT {
// Verify the second job template is deleted
Assertions.assertThrows(
NoSuchJobTemplateException.class, () ->
metalake.getJobTemplate(template2.name()));
-
- // Verify the list of job templates is empty after deleting both
- registeredTemplates = metalake.listJobTemplates();
- Assertions.assertTrue(registeredTemplates.isEmpty());
}
@Test
diff --git
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/JobAuthorizationIT.java
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/JobAuthorizationIT.java
index a2841cb9b2..fea6cc8094 100644
---
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/JobAuthorizationIT.java
+++
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/authorization/JobAuthorizationIT.java
@@ -117,11 +117,14 @@ public class JobAuthorizationIT extends
BaseRestApiAuthorizationIT {
// Normal user can see job templates they own (test_1, test_2)
List<JobTemplate> normalUserTemplates =
normalUserClient.loadMetalake(METALAKE).listJobTemplates();
- Assertions.assertEquals(2, normalUserTemplates.size());
+ Assertions.assertTrue(normalUserTemplates.stream().anyMatch(s ->
s.name().equals("test_1")));
+ Assertions.assertTrue(normalUserTemplates.stream().anyMatch(s ->
s.name().equals("test_2")));
// Admin can see all job templates (test_1, test_2, test_3)
List<JobTemplate> adminTemplates =
client.loadMetalake(METALAKE).listJobTemplates();
- Assertions.assertEquals(3, adminTemplates.size());
+ Assertions.assertTrue(adminTemplates.stream().anyMatch(s ->
s.name().equals("test_1")));
+ Assertions.assertTrue(adminTemplates.stream().anyMatch(s ->
s.name().equals("test_2")));
+ Assertions.assertTrue(adminTemplates.stream().anyMatch(s ->
s.name().equals("test_3")));
}
@Test
diff --git a/clients/client-python/gravitino/exceptions/base.py
b/clients/client-python/gravitino/exceptions/base.py
index de7ef01bce..1661dc6559 100644
--- a/clients/client-python/gravitino/exceptions/base.py
+++ b/clients/client-python/gravitino/exceptions/base.py
@@ -181,6 +181,14 @@ class
JobTemplateAlreadyExistsException(AlreadyExistsException):
"""An exception thrown when a job template with specified name already
exists."""
+class IllegalJobTemplateOperationException(IllegalArgumentException):
+ """An exception thrown when an illegal operation is attempted on a job
template.
+
+ This exception is raised when attempting to modify or delete a built-in
job template,
+ or when trying to create a user template with a reserved name (e.g.,
starting with 'builtin-').
+ """
+
+
class NoSuchJobTemplateException(NotFoundException):
"""An exception thrown when a job template with specified name is not
found."""
diff --git
a/clients/client-python/gravitino/exceptions/handlers/job_error_handler.py
b/clients/client-python/gravitino/exceptions/handlers/job_error_handler.py
index fc96e4c2b8..a769cd1605 100644
--- a/clients/client-python/gravitino/exceptions/handlers/job_error_handler.py
+++ b/clients/client-python/gravitino/exceptions/handlers/job_error_handler.py
@@ -21,6 +21,7 @@ from gravitino.exceptions.handlers.rest_error_handler import
RestErrorHandler
from gravitino.exceptions.base import (
NoSuchMetalakeException,
JobTemplateAlreadyExistsException,
+ IllegalJobTemplateOperationException,
NoSuchJobTemplateException,
NoSuchJobException,
InUseException,
@@ -34,6 +35,10 @@ class JobErrorHandler(RestErrorHandler):
code = error_response.code()
exception_type = error_response.type()
+ if code == ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
+ if exception_type == IllegalJobTemplateOperationException.__name__:
+ raise IllegalJobTemplateOperationException(error_message)
+
if code == ErrorConstants.NOT_FOUND_CODE:
if exception_type == NoSuchMetalakeException.__name__:
raise NoSuchMetalakeException(error_message)
diff --git a/clients/client-python/tests/integration/test_supports_jobs.py
b/clients/client-python/tests/integration/test_supports_jobs.py
index 4a5195a530..e613085cea 100644
--- a/clients/client-python/tests/integration/test_supports_jobs.py
+++ b/clients/client-python/tests/integration/test_supports_jobs.py
@@ -95,7 +95,6 @@ class TestSupportsJobs(IntegrationTestEnv):
# List registered job templates
registered_templates = self._metalake.list_job_templates()
- self.assertEqual(len(registered_templates), 2)
self.assertIn(template_1, registered_templates)
self.assertIn(template_2, registered_templates)
@@ -127,7 +126,6 @@ class TestSupportsJobs(IntegrationTestEnv):
# List registered job templates
registered_templates = self._metalake.list_job_templates()
- self.assertEqual(len(registered_templates), 2)
self.assertIn(template1, registered_templates)
self.assertIn(template2, registered_templates)
@@ -150,7 +148,6 @@ class TestSupportsJobs(IntegrationTestEnv):
# Verify the list of job templates after deletion
registered_templates = self._metalake.list_job_templates()
- self.assertEqual(len(registered_templates), 1)
self.assertIn(template2, registered_templates)
# Test deleting a non-existent job template
@@ -163,10 +160,6 @@ class TestSupportsJobs(IntegrationTestEnv):
with self.assertRaises(NoSuchJobTemplateException):
self._metalake.get_job_template(template2.name)
- # Verify the list of job templates is empty after deleting both
- registered_templates = self._metalake.list_job_templates()
- self.assertTrue(len(registered_templates) == 0)
-
def test_register_and_alter_job_template(self):
template = self.builder.with_name("test_alter").build()
self._metalake.register_job_template(template)
diff --git a/common/src/test/java/org/apache/gravitino/json/TestSerializer.java
b/common/src/test/java/org/apache/gravitino/json/TestSerializer.java
index a8fff05622..f5e11a9778 100644
--- a/common/src/test/java/org/apache/gravitino/json/TestSerializer.java
+++ b/common/src/test/java/org/apache/gravitino/json/TestSerializer.java
@@ -57,8 +57,8 @@ public class TestSerializer {
String actualJson =
JsonUtils.anyFieldMapper().writeValueAsString(DTOConverters.toDTO(distribution));
String expectedJson =
- """
-
{"strategy":"even","number":10,"funcArgs":[{"type":"field","fieldName":["col1"]}]}""";
+ "{\"strategy\":\"even\",\"number\":10,\"funcArgs\":"
+ + "[{\"type\":\"field\",\"fieldName\":[\"col1\"]}]}";
Assertions.assertEquals(expectedJson, actualJson);
DistributionDTO deserialized =
JsonUtils.anyFieldMapper().readValue(actualJson,
DistributionDTO.class);
@@ -73,10 +73,10 @@ public class TestSerializer {
"bucket", Literals.integerLiteral(10),
NamedReference.field("col_1")));
actualJson =
JsonUtils.anyFieldMapper().writeValueAsString(DTOConverters.toDTO(distribution));
expectedJson =
- """
-
{"strategy":"even","number":10,"funcArgs":[{"type":"function","funcName":"bucket",\
-
"funcArgs":[{"type":"literal","dataType":"integer","value":"10"},{"type":"field",\
- "fieldName":["col_1"]}]}]}""";
+
"{\"strategy\":\"even\",\"number\":10,\"funcArgs\":[{\"type\":\"function\","
+ + "\"funcName\":\"bucket\",\"funcArgs\":[{\"type\":\"literal\","
+ + "\"dataType\":\"integer\",\"value\":\"10\"},{\"type\":\"field\","
+ + "\"fieldName\":[\"col_1\"]}]}]}";
Assertions.assertEquals(expectedJson, actualJson);
deserialized = JsonUtils.anyFieldMapper().readValue(actualJson,
DistributionDTO.class);
Assertions.assertEquals(distribution, DTOConverters.fromDTO(deserialized));
@@ -90,9 +90,8 @@ public class TestSerializer {
String actualJson =
JsonUtils.anyFieldMapper().writeValueAsString(DTOConverters.toDTO(sortOrder));
String expectedJson =
- """
-
{"sortTerm":{"type":"field","fieldName":["col1"]},"direction":"asc",\
- "nullOrdering":"nulls_last"}""";
+ "{\"sortTerm\":{\"type\":\"field\",\"fieldName\":[\"col1\"]},"
+ + "\"direction\":\"asc\",\"nullOrdering\":\"nulls_last\"}";
Assertions.assertEquals(expectedJson, actualJson);
SortOrderDTO deserialized =
@@ -105,9 +104,9 @@ public class TestSerializer {
SortDirection.DESCENDING);
actualJson =
JsonUtils.anyFieldMapper().writeValueAsString(DTOConverters.toDTO(sortOrder));
expectedJson =
- """
-
{"sortTerm":{"type":"function","funcName":"lower","funcArgs":[{"type":"field",\
-
"fieldName":["col_1"]}]},"direction":"desc","nullOrdering":"nulls_last"}""";
+ "{\"sortTerm\":{\"type\":\"function\",\"funcName\":\"lower\","
+ + "\"funcArgs\":[{\"type\":\"field\",\"fieldName\":[\"col_1\"]}]},"
+ + "\"direction\":\"desc\",\"nullOrdering\":\"nulls_last\"}";
Assertions.assertEquals(expectedJson, actualJson);
deserialized = JsonUtils.anyFieldMapper().readValue(actualJson,
SortOrderDTO.class);
Assertions.assertEquals(sortOrder, DTOConverters.fromDTO(deserialized));
@@ -121,8 +120,7 @@ public class TestSerializer {
String actualJson =
JsonUtils.anyFieldMapper().writeValueAsString((DTOConverters.toDTO(index)));
String expectedJson =
- """
-
{"indexType":"PRIMARY_KEY","name":"index_1","fieldNames":[["col1"]]}""";
+ "{\"indexType\":\"PRIMARY_KEY\",\"name\":\"index_1\"," +
"\"fieldNames\":[[\"col1\"]]}";
Assertions.assertEquals(expectedJson, actualJson);
IndexDTO deserialized = JsonUtils.anyFieldMapper().readValue(actualJson,
IndexDTO.class);
Assertions.assertEquals(index, DTOConverters.fromDTO(deserialized));
@@ -135,8 +133,8 @@ public class TestSerializer {
new String[][] {new String[] {"col1"}, new String[] {"col2"}});
actualJson = JsonUtils.anyFieldMapper().writeValueAsString(index);
expectedJson =
- """
-
{"indexType":"unique_key","name":"index_2","fieldNames":[["col1"],["col2"]]}""";
+ "{\"indexType\":\"unique_key\",\"name\":\"index_2\","
+ + "\"fieldNames\":[[\"col1\"],[\"col2\"]]}";
Assertions.assertEquals(expectedJson, actualJson);
}
@@ -146,8 +144,7 @@ public class TestSerializer {
Transform transform = DayPartitioningDTO.of("dt");
String actualJson =
JsonUtils.anyFieldMapper().writeValueAsString(DTOConverters.toDTO(transform));
- String expectedJson = """
- {"strategy":"day","fieldName":["dt"]}""";
+ String expectedJson = "{\"strategy\":\"day\",\"fieldName\":[\"dt\"]}";
Assertions.assertEquals(expectedJson, actualJson);
Partitioning deserialized =
@@ -175,10 +172,11 @@ public class TestSerializer {
actualJson = JsonUtils.anyFieldMapper().writeValueAsString(transform);
expectedJson =
- """
-
{"type":"range_partitioning","fieldName":["dt"],"assignments":[{"name":"p1",
-
"properties":null,"upper":{"type":"literal","dataType":"string","value":"2024-01-01"},
-
"lower":{"type":"literal","dataType":"string","value":"2023-01-01"}}]}""";
+ "{\"type\":\"range_partitioning\",\"fieldName\":[\"dt\"],"
+ + "\"assignments\":[{\"name\":\"p1\",\"properties\":null,"
+ + "\"upper\":{\"type\":\"literal\",\"dataType\":\"string\","
+ + "\"value\":\"2024-01-01\"},\"lower\":{\"type\":\"literal\","
+ + "\"dataType\":\"string\",\"value\":\"2023-01-01\"}}]}";
Assertions.assertEquals(expectedJson, actualJson);
deserialized = JsonUtils.anyFieldMapper().readValue(actualJson,
Partitioning.class);
Assertions.assertEquals(transform, DTOConverters.fromDTO(deserialized));
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index aa58ddf855..a4403c8126 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -63,8 +63,10 @@ import org.apache.gravitino.hook.SchemaHookDispatcher;
import org.apache.gravitino.hook.TableHookDispatcher;
import org.apache.gravitino.hook.TagHookDispatcher;
import org.apache.gravitino.hook.TopicHookDispatcher;
+import org.apache.gravitino.job.BuiltInJobTemplateEventListener;
import org.apache.gravitino.job.JobManager;
import org.apache.gravitino.job.JobOperationDispatcher;
+import org.apache.gravitino.job.JobTemplateValidationDispatcher;
import org.apache.gravitino.listener.AccessControlEventDispatcher;
import org.apache.gravitino.listener.CatalogEventDispatcher;
import org.apache.gravitino.listener.EventBus;
@@ -655,8 +657,16 @@ public class GravitinoEnv {
new PolicyEventDispatcher(eventBus, new PolicyManager(idGenerator,
entityStore));
this.policyDispatcher = new PolicyHookDispatcher(policyEventDispatcher);
+ JobManager jobManager = new JobManager(config, entityStore, idGenerator);
+ JobTemplateValidationDispatcher validationDispatcher =
+ new JobTemplateValidationDispatcher(jobManager);
this.jobOperationDispatcher =
- new JobEventDispatcher(
- eventBus, new JobHookDispatcher(new JobManager(config,
entityStore, idGenerator)));
+ new JobEventDispatcher(eventBus, new
JobHookDispatcher(validationDispatcher));
+
+ // Register built-in job template event listener to automatically register
templates
+ // when metalakes are created
+ BuiltInJobTemplateEventListener builtInJobTemplateListener =
+ new BuiltInJobTemplateEventListener(jobManager, entityStore,
idGenerator);
+ eventListenerManager.addEventListener("builtin-job-template",
builtInJobTemplateListener);
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/job/BuiltInJobTemplateEventListener.java
b/core/src/main/java/org/apache/gravitino/job/BuiltInJobTemplateEventListener.java
new file mode 100644
index 0000000000..a445aa13aa
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/job/BuiltInJobTemplateEventListener.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.job;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URLClassLoader;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.listener.api.EventListenerPlugin;
+import org.apache.gravitino.listener.api.event.CreateMetalakeEvent;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.lock.LockType;
+import org.apache.gravitino.lock.TreeLockUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.metalake.MetalakeManager;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Event listener that automatically registers built-in job templates when
metalakes are created.
+ *
+ * <p>This listener monitors metalake creation events and registers all
discovered built-in job
+ * templates (via JobTemplateProvider SPI) into the newly created metalake. It
also handles
+ * registration for existing metalakes on first startup.
+ */
+public class BuiltInJobTemplateEventListener implements EventListenerPlugin {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BuiltInJobTemplateEventListener.class);
+
+ private static final Pattern BUILTIN_JOB_TEMPLATE_NAME_PATTERN =
+ Pattern.compile(JobTemplateProvider.BUILTIN_NAME_PATTERN);
+ private static final Pattern VERSION_PATTERN =
+ Pattern.compile(JobTemplateProvider.VERSION_VALUE_PATTERN);
+
+ private final JobManager jobManager;
+ private final EntityStore entityStore;
+ private final IdGenerator idGenerator;
+
+ public BuiltInJobTemplateEventListener(
+ JobManager jobManager, EntityStore entityStore, IdGenerator idGenerator)
{
+ this.jobManager = jobManager;
+ this.entityStore = entityStore;
+ this.idGenerator = idGenerator;
+ }
+
+ @Override
+ public void init(Map<String, String> properties) throws RuntimeException {
+ // Dependencies will be set via setDependencies() method
+ // This is called from EventListenerManager before start()
+ }
+
+ @Override
+ public void start() throws RuntimeException {
+ // Register built-in job templates for all existing metalakes on first
startup
+ try {
+ List<String> existingMetalakes =
MetalakeManager.listInUseMetalakes(entityStore);
+ if (existingMetalakes.isEmpty()) {
+ return;
+ }
+
+ LOG.info(
+ "Registering built-in job templates for {} existing metalakes",
existingMetalakes.size());
+
+ Map<String, JobTemplate> builtInTemplates = loadBuiltInJobTemplates();
+ if (builtInTemplates.isEmpty()) {
+ LOG.info("No built-in job templates discovered via
JobTemplateProvider");
+ return;
+ }
+
+ existingMetalakes.forEach(
+ metalake -> {
+ try {
+ reconcileBuiltInJobTemplates(metalake, builtInTemplates);
+ } catch (Exception e) {
+ LOG.error("Failed to register built-in job templates for
metalake: {}", metalake, e);
+ }
+ });
+
+ } catch (Exception e) {
+ LOG.error("Failed to register built-in job templates for existing
metalakes", e);
+ }
+ }
+
+ @Override
+ public void stop() throws RuntimeException {
+ // No resources to clean up
+ }
+
+ @Override
+ public void onPostEvent(Event postEvent) throws RuntimeException {
+ if (postEvent instanceof CreateMetalakeEvent) {
+ CreateMetalakeEvent event = (CreateMetalakeEvent) postEvent;
+ String metalakeName = event.identifier().name();
+
+ try {
+ Map<String, JobTemplate> builtInTemplates = loadBuiltInJobTemplates();
+ if (builtInTemplates.isEmpty()) {
+ LOG.debug("No built-in job templates to register for metalake: {}",
metalakeName);
+ return;
+ }
+
+ reconcileBuiltInJobTemplates(metalakeName, builtInTemplates);
+ LOG.info("Registered built-in job templates for metalake: {}",
metalakeName);
+ } catch (Exception e) {
+ LOG.error("Failed to register built-in job templates for metalake:
{}", metalakeName, e);
+ }
+ }
+ }
+
+ @Override
+ public Mode mode() {
+ // Use async isolated to avoid blocking metalake creation
+ return Mode.ASYNC_ISOLATED;
+ }
+
+ @VisibleForTesting
+ Map<String, JobTemplate> loadBuiltInJobTemplates() {
+ Map<String, JobTemplate> builtInTemplates = Maps.newHashMap();
+
+ // Load from auxlib directory if available
+ ClassLoader auxlibClassLoader = null;
+ try {
+ auxlibClassLoader = createAuxlibClassLoader();
+ ServiceLoader<JobTemplateProvider> loader =
+ ServiceLoader.load(JobTemplateProvider.class, auxlibClassLoader);
+
+ loader.forEach(
+ provider ->
+ provider
+ .jobTemplates()
+ .forEach(
+ template -> {
+ if (!isValidBuiltInJobTemplate(template)) {
+ LOG.warn("Skip invalid built-in job template {}",
template.name());
+ return;
+ }
+
+ JobTemplate existing =
builtInTemplates.get(template.name());
+ int newVersion = version(template.customFields());
+ int existingVersion =
+ Optional.ofNullable(existing)
+ .map(jt -> version(jt.customFields()))
+ .orElse(0);
+ if (existing == null || newVersion > existingVersion) {
+ builtInTemplates.put(template.name(), template);
+ }
+ }));
+ return builtInTemplates;
+
+ } finally {
+ if (auxlibClassLoader instanceof URLClassLoader) {
+ try {
+ ((URLClassLoader) auxlibClassLoader).close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close auxlib classloader", e);
+ }
+ }
+ }
+ }
+
+ private ClassLoader createAuxlibClassLoader() {
+ String gravitinoHome = System.getenv("GRAVITINO_HOME");
+ if (gravitinoHome == null) {
+ LOG.warn("GRAVITINO_HOME not set, using current classloader for built-in
job templates");
+ return Thread.currentThread().getContextClassLoader();
+ }
+
+ File auxlibDir = new File(gravitinoHome, "auxlib");
+ if (!auxlibDir.exists() || !auxlibDir.isDirectory()) {
+ LOG.warn(
+ "Auxlib directory {} does not exist, using current classloader for
built-in job templates",
+ auxlibDir.getAbsolutePath());
+ return Thread.currentThread().getContextClassLoader();
+ }
+
+ // Only load gravitino-jobs-*.jar files
+ File[] jarFiles =
+ auxlibDir.listFiles(
+ (dir, name) -> name.startsWith("gravitino-jobs-") &&
name.endsWith(".jar"));
+ if (jarFiles == null || jarFiles.length == 0) {
+ LOG.info(
+ "No gravitino-jobs JAR files found in auxlib directory {}",
auxlibDir.getAbsolutePath());
+ return Thread.currentThread().getContextClassLoader();
+ }
+
+ try {
+ URI[] jarUris =
Arrays.stream(jarFiles).map(File::toURI).toArray(URI[]::new);
+ LOG.info(
+ "Loading built-in job templates from {} gravitino-jobs JAR file(s)
in auxlib directory",
+ jarUris.length);
+ return new URLClassLoader(
+ Arrays.stream(jarUris)
+ .map(
+ uri -> {
+ try {
+ return uri.toURL();
+ } catch (MalformedURLException e) {
+ throw new RuntimeException("Failed to convert URI to
URL: " + uri, e);
+ }
+ })
+ .toArray(java.net.URL[]::new),
+ Thread.currentThread().getContextClassLoader());
+ } catch (Exception e) {
+ LOG.error("Failed to create auxlib classloader", e);
+ return Thread.currentThread().getContextClassLoader();
+ }
+ }
+
+ private boolean isValidBuiltInJobTemplate(JobTemplate jobTemplate) {
+ if (!isValidBuiltInName(jobTemplate.name())) {
+ return false;
+ }
+
+ return getVersion(jobTemplate.customFields()).isPresent();
+ }
+
+ @VisibleForTesting
+ void reconcileBuiltInJobTemplates(String metalake, Map<String, JobTemplate>
builtInTemplates) {
+ Map<String, JobTemplateEntity> existingBuiltIns =
+ jobManager.listJobTemplates(metalake).stream()
+ .filter(entity -> isValidBuiltInName(entity.name()))
+ .collect(Collectors.toMap(JobTemplateEntity::name, entity ->
entity));
+
+ builtInTemplates.forEach(
+ (name, newTemplate) -> {
+ JobTemplateEntity existing = existingBuiltIns.remove(name);
+ if (existing == null) {
+ registerNewBuiltInJobTemplate(metalake, newTemplate);
+ } else {
+ int existingVersion =
version(existing.templateContent().customFields());
+ int newVersion = version(newTemplate.customFields());
+ if (newVersion > existingVersion) {
+ updateBuiltInJobTemplate(metalake, existing, newTemplate);
+ } else {
+ LOG.info("Built-in job template {} under metalake {} is up to
date", name, metalake);
+ }
+ }
+ });
+
+ existingBuiltIns.values().forEach(entity ->
deleteObsoleteBuiltInJobTemplate(metalake, entity));
+ }
+
+ private void registerNewBuiltInJobTemplate(String metalake, JobTemplate
jobTemplate) {
+ JobTemplateEntity jobTemplateEntity =
+ JobTemplateEntity.builder()
+ .withId(idGenerator.nextId())
+ .withName(jobTemplate.name())
+ .withNamespace(NamespaceUtil.ofJobTemplate(metalake))
+ .withComment(jobTemplate.comment())
+
.withTemplateContent(JobTemplateEntity.TemplateContent.fromJobTemplate(jobTemplate))
+ .withAuditInfo(
+ AuditInfo.builder()
+
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+
+ try {
+ jobManager.registerJobTemplate(metalake, jobTemplateEntity);
+ LOG.info(
+ "Registered built-in job template {} under metalake {}",
jobTemplate.name(), metalake);
+ } catch (JobTemplateAlreadyExistsException e) {
+ LOG.warn(
+ "Built-in job template {} already exists under metalake {}, skip
registering",
+ jobTemplate.name(),
+ metalake,
+ e);
+ }
+ }
+
+ private void updateBuiltInJobTemplate(
+ String metalake, JobTemplateEntity existing, JobTemplate newTemplate) {
+ NameIdentifier identifier = NameIdentifierUtil.ofJobTemplate(metalake,
newTemplate.name());
+
+ TreeLockUtils.doWithTreeLock(
+ identifier,
+ LockType.WRITE,
+ () -> {
+ try {
+ return entityStore.update(
+ identifier,
+ JobTemplateEntity.class,
+ Entity.EntityType.JOB_TEMPLATE,
+ jobTemplateEntity ->
+ JobTemplateEntity.builder()
+ .withId(jobTemplateEntity.id())
+ .withName(newTemplate.name())
+ .withNamespace(jobTemplateEntity.namespace())
+ .withComment(newTemplate.comment())
+ .withTemplateContent(
+
JobTemplateEntity.TemplateContent.fromJobTemplate(newTemplate))
+ .withAuditInfo(
+ AuditInfo.builder()
+
.withCreator(jobTemplateEntity.auditInfo().creator())
+
.withCreateTime(jobTemplateEntity.auditInfo().createTime())
+
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+ .withLastModifiedTime(Instant.now())
+ .build())
+ .build());
+ } catch (NoSuchEntityException | IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to update built-in job template %s under metalake
%s",
+ newTemplate.name(), metalake),
+ e);
+ }
+ });
+
+ LOG.info(
+ "Updated built-in job template {} under metalake {} from v{} to v{}",
+ newTemplate.name(),
+ metalake,
+ version(existing.templateContent().customFields()),
+ version(newTemplate.customFields()));
+ }
+
+ private void deleteObsoleteBuiltInJobTemplate(String metalake,
JobTemplateEntity jobTemplate) {
+ try {
+ if (jobManager.deleteJobTemplate(metalake, jobTemplate.name())) {
+ LOG.info(
+ "Deleted obsolete built-in job template {} under metalake {}",
+ jobTemplate.name(),
+ metalake);
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to delete obsolete built-in job template {} under metalake
{}",
+ jobTemplate.name(),
+ metalake,
+ e);
+ }
+ }
+
+ private boolean isValidBuiltInName(String name) {
+ return BUILTIN_JOB_TEMPLATE_NAME_PATTERN.matcher(name).matches();
+ }
+
+ private Optional<Integer> getVersion(Map<String, String> customFields) {
+ return Optional.ofNullable(customFields)
+ .map(fields -> fields.get(JobTemplateProvider.PROPERTY_VERSION_KEY))
+ .filter(StringUtils::isNotBlank)
+ .flatMap(
+ version -> {
+ Matcher matcher = VERSION_PATTERN.matcher(version);
+ if (!matcher.matches()) {
+ return Optional.empty();
+ }
+ return Optional.of(Integer.parseInt(version.substring(1)));
+ });
+ }
+
+ private int version(Map<String, String> customFields) {
+ return getVersion(customFields).orElse(0);
+ }
+}
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index 49d3a900b9..dd68b6c161 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -19,7 +19,6 @@
package org.apache.gravitino.job;
-import static org.apache.gravitino.Metalake.PROPERTY_IN_USE;
import static org.apache.gravitino.metalake.MetalakeManager.checkMetalake;
import com.google.common.annotations.VisibleForTesting;
@@ -58,9 +57,9 @@ import
org.apache.gravitino.exceptions.NoSuchJobTemplateException;
import org.apache.gravitino.lock.LockType;
import org.apache.gravitino.lock.TreeLockUtils;
import org.apache.gravitino.meta.AuditInfo;
-import org.apache.gravitino.meta.BaseMetalake;
import org.apache.gravitino.meta.JobEntity;
import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.metalake.MetalakeManager;
import org.apache.gravitino.storage.IdGenerator;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.NamespaceUtil;
@@ -536,7 +535,7 @@ public class JobManager implements JobOperationDispatcher {
@VisibleForTesting
void pullAndUpdateJobStatus() {
- List<String> metalakes = listInUseMetalakes(entityStore);
+ List<String> metalakes = MetalakeManager.listInUseMetalakes(entityStore);
for (String metalake : metalakes) {
// This unnecessary list all the jobs, we need to improve the code to
only list the active
// jobs.
@@ -626,7 +625,7 @@ public class JobManager implements JobOperationDispatcher {
@VisibleForTesting
void cleanUpStagingDirs() {
- List<String> metalakes = listInUseMetalakes(entityStore);
+ List<String> metalakes = MetalakeManager.listInUseMetalakes(entityStore);
for (String metalake : metalakes) {
List<JobEntity> finishedJobs =
@@ -823,24 +822,6 @@ public class JobManager implements JobOperationDispatcher {
}
}
- private static List<String> listInUseMetalakes(EntityStore entityStore) {
- try {
- List<BaseMetalake> metalakes =
- TreeLockUtils.doWithRootTreeLock(
- LockType.READ,
- () ->
- entityStore.list(
- Namespace.empty(), BaseMetalake.class,
Entity.EntityType.METALAKE));
- return metalakes.stream()
- .filter(
- m -> (boolean)
m.propertiesMetadata().getOrDefault(m.properties(), PROPERTY_IN_USE))
- .map(BaseMetalake::name)
- .collect(Collectors.toList());
- } catch (IOException e) {
- throw new RuntimeException("Failed to list in-use metalakes", e);
- }
- }
-
@VisibleForTesting
JobTemplateEntity updateJobTemplateEntity(
NameIdentifier jobTemplateIdent,
diff --git
a/core/src/main/java/org/apache/gravitino/job/JobTemplateValidationDispatcher.java
b/core/src/main/java/org/apache/gravitino/job/JobTemplateValidationDispatcher.java
new file mode 100644
index 0000000000..6a25735a5b
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/job/JobTemplateValidationDispatcher.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.exceptions.IllegalJobTemplateOperationException;
+import org.apache.gravitino.exceptions.InUseException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.exceptions.NoSuchJobException;
+import org.apache.gravitino.exceptions.NoSuchJobTemplateException;
+import org.apache.gravitino.meta.JobEntity;
+import org.apache.gravitino.meta.JobTemplateEntity;
+
+/**
+ * {@code JobTemplateValidationDispatcher} is a decorator for {@link
JobOperationDispatcher} that
+ * enforces validation rules for job template operations, specifically
protecting built-in job
+ * templates from being modified or deleted by users.
+ *
+ * <p>Built-in job templates are identified by names starting with {@link
+ * JobTemplateProvider#BUILTIN_NAME_PREFIX}. These templates are managed by
the system and cannot be
+ * created, altered, or deleted through user operations.
+ *
+ * <p>This dispatcher ensures that:
+ *
+ * <ul>
+ * <li>Users cannot register job templates with names starting with
"builtin-"
+ * <li>Users cannot alter built-in job templates
+ * <li>Users cannot delete built-in job templates
+ * <li>Users cannot rename job templates to names starting with "builtin-"
+ * </ul>
+ */
+public class JobTemplateValidationDispatcher implements JobOperationDispatcher
{
+
+ private final JobOperationDispatcher dispatcher;
+
+ /**
+ * Creates a new JobTemplateValidationDispatcher.
+ *
+ * @param dispatcher the underlying dispatcher to delegate operations to
+ */
+ public JobTemplateValidationDispatcher(JobOperationDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public List<JobTemplateEntity> listJobTemplates(String metalake) {
+ return dispatcher.listJobTemplates(metalake);
+ }
+
+ @Override
+ public void registerJobTemplate(String metalake, JobTemplateEntity
jobTemplateEntity)
+ throws JobTemplateAlreadyExistsException {
+ validateNotBuiltInTemplateName(jobTemplateEntity.name());
+ dispatcher.registerJobTemplate(metalake, jobTemplateEntity);
+ }
+
+ @Override
+ public JobTemplateEntity getJobTemplate(String metalake, String
jobTemplateName)
+ throws NoSuchJobTemplateException {
+ return dispatcher.getJobTemplate(metalake, jobTemplateName);
+ }
+
+ @Override
+ public boolean deleteJobTemplate(String metalake, String jobTemplateName)
throws InUseException {
+ validateNotBuiltInTemplate(jobTemplateName, "delete");
+ return dispatcher.deleteJobTemplate(metalake, jobTemplateName);
+ }
+
+ @Override
+ public JobTemplateEntity alterJobTemplate(
+ String metalake, String jobTemplateName, JobTemplateChange... changes)
+ throws NoSuchJobTemplateException, IllegalArgumentException {
+ validateNotBuiltInTemplate(jobTemplateName, "alter");
+
+ // Check if any rename operation tries to rename to a built-in name
+ Optional<String> newName =
+ Arrays.stream(changes)
+ .filter(c -> c instanceof JobTemplateChange.RenameJobTemplate)
+ .map(c -> ((JobTemplateChange.RenameJobTemplate) c).getNewName())
+ .reduce((first, second) -> second);
+
+ if (newName.isPresent()) {
+ validateNotBuiltInTemplateName(newName.get());
+ }
+
+ return dispatcher.alterJobTemplate(metalake, jobTemplateName, changes);
+ }
+
+ @Override
+ public List<JobEntity> listJobs(String metalake, Optional<String>
jobTemplateName)
+ throws NoSuchJobTemplateException {
+ return dispatcher.listJobs(metalake, jobTemplateName);
+ }
+
+ @Override
+ public JobEntity getJob(String metalake, String jobId) throws
NoSuchJobException {
+ return dispatcher.getJob(metalake, jobId);
+ }
+
+ @Override
+ public JobEntity runJob(String metalake, String jobTemplateName, Map<String,
String> jobConf)
+ throws NoSuchJobTemplateException {
+ return dispatcher.runJob(metalake, jobTemplateName, jobConf);
+ }
+
+ @Override
+ public JobEntity cancelJob(String metalake, String jobId) throws
NoSuchJobException {
+ return dispatcher.cancelJob(metalake, jobId);
+ }
+
+ @Override
+ public void close() throws IOException {
+ dispatcher.close();
+ }
+
+ /**
+ * Validates that the given job template name does not start with the
built-in prefix.
+ *
+ * @param templateName the job template name to validate
+ * @throws IllegalJobTemplateOperationException if the name starts with the
built-in prefix
+ */
+ @VisibleForTesting
+ void validateNotBuiltInTemplateName(String templateName) {
+ if (isBuiltInTemplateName(templateName)) {
+ throw new IllegalJobTemplateOperationException(
+ "Job template name '%s' is reserved for built-in templates. "
+ + "User-created job templates cannot have names starting with
'%s'",
+ templateName, JobTemplateProvider.BUILTIN_NAME_PREFIX);
+ }
+ }
+
+ /**
+ * Validates that the given job template is not a built-in template before
performing the
+ * specified operation.
+ *
+ * @param templateName the job template name to validate
+ * @param operation the operation being attempted (e.g., "delete", "alter")
+ * @throws IllegalJobTemplateOperationException if the template is a
built-in template
+ */
+ @VisibleForTesting
+ void validateNotBuiltInTemplate(String templateName, String operation) {
+ if (isBuiltInTemplateName(templateName)) {
+ throw new IllegalJobTemplateOperationException(
+ "Cannot %s built-in job template '%s'. "
+ + "Built-in job templates are managed by the system and cannot
be modified or deleted"
+ + " by users",
+ operation, templateName);
+ }
+ }
+
+ /**
+ * Checks if the given template name is a built-in template name.
+ *
+ * @param templateName the template name to check
+ * @return true if the name starts with the built-in prefix, false otherwise
+ */
+ @VisibleForTesting
+ static boolean isBuiltInTemplateName(String templateName) {
+ return templateName != null &&
templateName.startsWith(JobTemplateProvider.BUILTIN_NAME_PREFIX);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
b/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
index 9b2a968acf..fd6592d727 100644
--- a/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
+++ b/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.gravitino.Entity.EntityType;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
@@ -129,6 +130,29 @@ public class MetalakeManager implements
MetalakeDispatcher, Closeable {
}
}
+ /**
+ * Lists all in-use Metalakes.
+ *
+ * @param entityStore The EntityStore to use for managing Metalakes.
+ * @return A list of names of in-use Metalakes.
+ * @throws RuntimeException If listing in-use Metalakes encounters storage
issues.
+ */
+ public static List<String> listInUseMetalakes(EntityStore entityStore) {
+ try {
+ List<BaseMetalake> metalakes =
+ TreeLockUtils.doWithRootTreeLock(
+ LockType.READ,
+ () -> entityStore.list(Namespace.empty(), BaseMetalake.class,
EntityType.METALAKE));
+ return metalakes.stream()
+ .filter(
+ m -> (boolean)
m.propertiesMetadata().getOrDefault(m.properties(), PROPERTY_IN_USE))
+ .map(BaseMetalake::name)
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to list in-use metalakes", e);
+ }
+ }
+
/**
* Lists all available Metalakes.
*
@@ -151,6 +175,7 @@ public class MetalakeManager implements MetalakeDispatcher,
Closeable {
throw new RuntimeException(ioe);
}
}
+
/**
* Loads a Metalake.
*
diff --git
a/core/src/test/java/org/apache/gravitino/job/TestBuiltInJobTemplateEventListener.java
b/core/src/test/java/org/apache/gravitino/job/TestBuiltInJobTemplateEventListener.java
new file mode 100644
index 0000000000..e5273e6f83
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/job/TestBuiltInJobTemplateEventListener.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.job;
+
+import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
+import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
+import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.apache.gravitino.Metalake.PROPERTY_IN_USE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.function.Consumer;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.listener.api.event.CreateMetalakeEvent;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.info.MetalakeInfo;
+import org.apache.gravitino.lock.LockManager;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.apache.gravitino.meta.SchemaVersion;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.memory.TestMemoryEntityStore;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testcontainers.shaded.org.apache.commons.lang3.reflect.FieldUtils;
+
+public class TestBuiltInJobTemplateEventListener {
+
+ private BuiltInJobTemplateEventListener listener;
+ private JobManager jobManager;
+ private EntityStore entityStore;
+ private IdGenerator idGenerator;
+ private Config config;
+
+ @BeforeEach
+ public void setUp() throws IllegalAccessException {
+ config = Mockito.mock(Config.class);
+
+ doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
+ doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
+ doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+
+ entityStore = new TestMemoryEntityStore.InMemoryEntityStore();
+ entityStore.initialize(config);
+
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new
LockManager(config), true);
+ idGenerator = new RandomIdGenerator();
+
+ jobManager = mock(JobManager.class);
+ listener = new BuiltInJobTemplateEventListener(jobManager, entityStore,
idGenerator);
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ if (entityStore != null) {
+ entityStore.close();
+ }
+ }
+
+ @Test
+ public void testOnPostCreateMetalakeEvent() {
+ String metalakeName = "test_metalake";
+ NameIdentifier identifier = NameIdentifier.of(metalakeName);
+ MetalakeInfo metalakeInfo = new MetalakeInfo(metalakeName, "comment",
null, null);
+
+ CreateMetalakeEvent event = new CreateMetalakeEvent("user", identifier,
metalakeInfo);
+
+ // Mock loadBuiltInJobTemplates to return one template
+ JobTemplateProvider provider = mock(JobTemplateProvider.class);
+ ShellJobTemplate template =
+ ShellJobTemplate.builder()
+ .withName("builtin-test")
+ .withComment("test template")
+ .withExecutable("/bin/echo")
+ .withCustomFields(Collections.singletonMap("version", "v1"))
+ .build();
+
+ when(provider.jobTemplates()).thenAnswer(invocation ->
Collections.singletonList(template));
+
+ try (MockedStatic<ServiceLoader> mockedLoader =
mockStatic(ServiceLoader.class)) {
+ ServiceLoader<JobTemplateProvider> serviceLoader =
mock(ServiceLoader.class);
+ mockedLoader
+ .when(() -> ServiceLoader.load(eq(JobTemplateProvider.class), any()))
+ .thenReturn(serviceLoader);
+
when(serviceLoader.iterator()).thenReturn(Lists.newArrayList(provider).iterator());
+
when(serviceLoader.spliterator()).thenReturn(Lists.newArrayList(provider).spliterator());
+ Mockito.doAnswer(
+ invocation -> {
+ Consumer<JobTemplateProvider> consumer =
invocation.getArgument(0);
+ consumer.accept(provider);
+ return null;
+ })
+ .when(serviceLoader)
+ .forEach(any());
+
+ // Mock jobManager methods
+
when(jobManager.listJobTemplates(metalakeName)).thenReturn(Collections.emptyList());
+ doNothing().when(jobManager).registerJobTemplate(eq(metalakeName),
any());
+
+ listener.onPostEvent(event);
+
+ // Verify registerJobTemplate was called
+ verify(jobManager, times(1)).registerJobTemplate(eq(metalakeName),
any());
+ }
+ }
+
+ @Test
+ public void testOnPostEventWithNonMetalakeEvent() {
+ Event otherEvent = mock(Event.class);
+
+ // Should not throw exception and should not interact with jobManager
+ listener.onPostEvent(otherEvent);
+
+ verify(jobManager, times(0)).listJobTemplates(any());
+ verify(jobManager, times(0)).registerJobTemplate(any(), any());
+ }
+
+ @Test
+ public void testStartWithExistingMetalakes() throws IOException {
+ // Create real metalakes in entity store
+ createMetalake("metalake1", true);
+ createMetalake("metalake2", true);
+ createMetalake("metalake3", false); // not in use
+
+ // Mock loadBuiltInJobTemplates
+ JobTemplateProvider provider = mock(JobTemplateProvider.class);
+ ShellJobTemplate template =
+ ShellJobTemplate.builder()
+ .withName("builtin-test")
+ .withComment("test")
+ .withExecutable("/bin/echo")
+ .withCustomFields(Collections.singletonMap("version", "v1"))
+ .build();
+ when(provider.jobTemplates()).thenAnswer(invocation ->
Collections.singletonList(template));
+
+ try (MockedStatic<ServiceLoader> mockedLoader =
mockStatic(ServiceLoader.class)) {
+ ServiceLoader<JobTemplateProvider> serviceLoader =
mock(ServiceLoader.class);
+ mockedLoader
+ .when(() -> ServiceLoader.load(eq(JobTemplateProvider.class), any()))
+ .thenReturn(serviceLoader);
+
when(serviceLoader.iterator()).thenReturn(Lists.newArrayList(provider).iterator());
+
when(serviceLoader.spliterator()).thenReturn(Lists.newArrayList(provider).spliterator());
+ Mockito.doAnswer(
+ invocation -> {
+ Consumer<JobTemplateProvider> consumer =
invocation.getArgument(0);
+ consumer.accept(provider);
+ return null;
+ })
+ .when(serviceLoader)
+ .forEach(any());
+
+
when(jobManager.listJobTemplates(any())).thenReturn(Collections.emptyList());
+ doNothing().when(jobManager).registerJobTemplate(any(), any());
+
+ listener.start();
+
+ // Should only register for metalake1 and metalake2 (in-use metalakes)
+ verify(jobManager, times(1)).listJobTemplates("metalake1");
+ verify(jobManager, times(1)).listJobTemplates("metalake2");
+ verify(jobManager, times(0)).listJobTemplates("metalake3");
+ verify(jobManager, times(2)).registerJobTemplate(any(), any());
+ }
+ }
+
+ @Test
+ public void testStartWithNoExistingMetalakes() throws IOException {
+ // No metalakes created, entityStore is empty
+ listener.start();
+
+ // Should not interact with jobManager
+ verify(jobManager, times(0)).listJobTemplates(any());
+ verify(jobManager, times(0)).registerJobTemplate(any(), any());
+ }
+
+ @Test
+ public void testLoadBuiltInJobTemplatesWithValidTemplates() {
+ JobTemplateProvider provider = mock(JobTemplateProvider.class);
+
+ ShellJobTemplate template1 =
+ ShellJobTemplate.builder()
+ .withName("builtin-sparkpi")
+ .withComment("v1")
+ .withExecutable("/bin/echo")
+ .withCustomFields(Collections.singletonMap("version", "v1"))
+ .build();
+
+ ShellJobTemplate template2 =
+ ShellJobTemplate.builder()
+ .withName("builtin-sparkpi")
+ .withComment("v2")
+ .withExecutable("/bin/echo")
+ .withCustomFields(Collections.singletonMap("version", "v2"))
+ .build();
+
+ when(provider.jobTemplates()).thenAnswer(invocation ->
Arrays.asList(template1, template2));
+
+ try (MockedStatic<ServiceLoader> mockedLoader =
mockStatic(ServiceLoader.class)) {
+ ServiceLoader<JobTemplateProvider> serviceLoader =
mock(ServiceLoader.class);
+ mockedLoader
+ .when(() -> ServiceLoader.load(eq(JobTemplateProvider.class), any()))
+ .thenReturn(serviceLoader);
+
when(serviceLoader.iterator()).thenReturn(Lists.newArrayList(provider).iterator());
+
when(serviceLoader.spliterator()).thenReturn(Lists.newArrayList(provider).spliterator());
+ Mockito.doAnswer(
+ invocation -> {
+ Consumer<JobTemplateProvider> consumer =
invocation.getArgument(0);
+ consumer.accept(provider);
+ return null;
+ })
+ .when(serviceLoader)
+ .forEach(any());
+
+ Map<String, JobTemplate> templates = listener.loadBuiltInJobTemplates();
+
+ // Should keep the higher version (v2)
+ Assertions.assertEquals(1, templates.size());
+ Assertions.assertTrue(templates.containsKey("builtin-sparkpi"));
+ Assertions.assertEquals("v2",
templates.get("builtin-sparkpi").customFields().get("version"));
+ }
+ }
+
+ @Test
+ public void testLoadBuiltInJobTemplatesWithInvalidNames() {
+ JobTemplateProvider provider = mock(JobTemplateProvider.class);
+
+ ShellJobTemplate invalidName =
+ ShellJobTemplate.builder()
+ .withName("sparkpi") // Missing builtin- prefix
+ .withComment("invalid")
+ .withExecutable("/bin/echo")
+ .withCustomFields(Collections.singletonMap("version", "v1"))
+ .build();
+
+ ShellJobTemplate validName =
+ ShellJobTemplate.builder()
+ .withName("builtin-valid")
+ .withComment("valid")
+ .withExecutable("/bin/echo")
+ .withCustomFields(Collections.singletonMap("version", "v1"))
+ .build();
+
+ when(provider.jobTemplates()).thenAnswer(invocation ->
Arrays.asList(invalidName, validName));
+
+ try (MockedStatic<ServiceLoader> mockedLoader =
mockStatic(ServiceLoader.class)) {
+ ServiceLoader<JobTemplateProvider> serviceLoader =
mock(ServiceLoader.class);
+ mockedLoader
+ .when(() -> ServiceLoader.load(eq(JobTemplateProvider.class), any()))
+ .thenReturn(serviceLoader);
+
when(serviceLoader.iterator()).thenReturn(Lists.newArrayList(provider).iterator());
+
when(serviceLoader.spliterator()).thenReturn(Lists.newArrayList(provider).spliterator());
+ Mockito.doAnswer(
+ invocation -> {
+ Consumer<JobTemplateProvider> consumer =
invocation.getArgument(0);
+ consumer.accept(provider);
+ return null;
+ })
+ .when(serviceLoader)
+ .forEach(any());
+
+ Map<String, JobTemplate> templates = listener.loadBuiltInJobTemplates();
+
+ // Should only include valid template
+ Assertions.assertEquals(1, templates.size());
+ Assertions.assertTrue(templates.containsKey("builtin-valid"));
+ Assertions.assertFalse(templates.containsKey("sparkpi"));
+ }
+ }
+
+ @Test
+ public void testLoadBuiltInJobTemplatesWithInvalidVersions() {
+ JobTemplateProvider provider = mock(JobTemplateProvider.class);
+
+ ShellJobTemplate invalidVersion =
+ ShellJobTemplate.builder()
+ .withName("builtin-invalid")
+ .withComment("invalid")
+ .withExecutable("/bin/echo")
+ .withCustomFields(Collections.singletonMap("version", "version1"))
// Invalid format
+ .build();
+
+ ShellJobTemplate validVersion =
+ ShellJobTemplate.builder()
+ .withName("builtin-valid")
+ .withComment("valid")
+ .withExecutable("/bin/echo")
+ .withCustomFields(Collections.singletonMap("version", "v1"))
+ .build();
+
+ when(provider.jobTemplates())
+ .thenAnswer(invocation -> Arrays.asList(invalidVersion, validVersion));
+
+ try (MockedStatic<ServiceLoader> mockedLoader =
mockStatic(ServiceLoader.class)) {
+ ServiceLoader<JobTemplateProvider> serviceLoader =
mock(ServiceLoader.class);
+ mockedLoader
+ .when(() -> ServiceLoader.load(eq(JobTemplateProvider.class), any()))
+ .thenReturn(serviceLoader);
+
when(serviceLoader.iterator()).thenReturn(Lists.newArrayList(provider).iterator());
+
when(serviceLoader.spliterator()).thenReturn(Lists.newArrayList(provider).spliterator());
+ Mockito.doAnswer(
+ invocation -> {
+ Consumer<JobTemplateProvider> consumer =
invocation.getArgument(0);
+ consumer.accept(provider);
+ return null;
+ })
+ .when(serviceLoader)
+ .forEach(any());
+
+ Map<String, JobTemplate> templates = listener.loadBuiltInJobTemplates();
+
+ // Should only include template with valid version
+ Assertions.assertEquals(1, templates.size());
+ Assertions.assertTrue(templates.containsKey("builtin-valid"));
+ }
+ }
+
+ @Test
+ public void testReconcileBuiltInJobTemplatesNewRegistration() {
+ String metalakeName = "test_metalake";
+ Map<String, JobTemplate> builtInTemplates = new HashMap<>();
+
+ ShellJobTemplate newTemplate =
+ ShellJobTemplate.builder()
+ .withName("builtin-new")
+ .withComment("new template")
+ .withExecutable("/bin/echo")
+ .withCustomFields(Collections.singletonMap("version", "v1"))
+ .build();
+ builtInTemplates.put("builtin-new", newTemplate);
+
+
when(jobManager.listJobTemplates(metalakeName)).thenReturn(Collections.emptyList());
+ doNothing().when(jobManager).registerJobTemplate(eq(metalakeName), any());
+
+ listener.reconcileBuiltInJobTemplates(metalakeName, builtInTemplates);
+
+ verify(jobManager, times(1)).registerJobTemplate(eq(metalakeName), any());
+ }
+
+ @Test
+ public void testReconcileBuiltInJobTemplatesUpdate() throws IOException {
+ String metalakeName = "test_metalake";
+ Map<String, JobTemplate> builtInTemplates = new HashMap<>();
+
+ ShellJobTemplate updatedTemplate =
+ ShellJobTemplate.builder()
+ .withName("builtin-existing")
+ .withComment("updated")
+ .withExecutable("/bin/echo")
+ .withCustomFields(Collections.singletonMap("version", "v2"))
+ .build();
+ builtInTemplates.put("builtin-existing", updatedTemplate);
+
+ // Create and store the existing entity in the real entityStore
+ JobTemplateEntity existingEntity =
createJobTemplateEntity("builtin-existing", "v1");
+ entityStore.put(existingEntity, false);
+
+ when(jobManager.listJobTemplates(metalakeName))
+ .thenReturn(Collections.singletonList(existingEntity));
+
+ listener.reconcileBuiltInJobTemplates(metalakeName, builtInTemplates);
+
+ // Verify the entity was updated by loading it
+ JobTemplateEntity updated =
+ entityStore.get(
+ existingEntity.nameIdentifier(),
+ Entity.EntityType.JOB_TEMPLATE,
+ JobTemplateEntity.class);
+ Assertions.assertEquals("v2",
updated.templateContent().customFields().get("version"));
+ }
+
+ @Test
+ public void testReconcileBuiltInJobTemplatesNoUpdateWhenVersionSame() throws
IOException {
+ String metalakeName = "test_metalake";
+ Map<String, JobTemplate> builtInTemplates = new HashMap<>();
+
+ ShellJobTemplate template =
+ ShellJobTemplate.builder()
+ .withName("builtin-existing")
+ .withComment("same version")
+ .withExecutable("/bin/echo")
+ .withCustomFields(Collections.singletonMap("version", "v1"))
+ .build();
+ builtInTemplates.put("builtin-existing", template);
+
+ // Create and store the existing entity
+ JobTemplateEntity existingEntity =
createJobTemplateEntity("builtin-existing", "v1");
+ entityStore.put(existingEntity, false);
+
+ when(jobManager.listJobTemplates(metalakeName))
+ .thenReturn(Collections.singletonList(existingEntity));
+
+ listener.reconcileBuiltInJobTemplates(metalakeName, builtInTemplates);
+
+ // Should not update or register
+ verify(jobManager, times(0)).registerJobTemplate(any(), any());
+
+ // Verify entity version is still v1
+ JobTemplateEntity result =
+ entityStore.get(
+ existingEntity.nameIdentifier(),
+ Entity.EntityType.JOB_TEMPLATE,
+ JobTemplateEntity.class);
+ Assertions.assertEquals("v1",
result.templateContent().customFields().get("version"));
+ }
+
+ @Test
+ public void testReconcileBuiltInJobTemplatesDeleteObsolete() {
+ String metalakeName = "test_metalake";
+ Map<String, JobTemplate> builtInTemplates = new HashMap<>(); // Empty - no
templates
+
+ JobTemplateEntity obsoleteEntity =
createJobTemplateEntity("builtin-obsolete", "v1");
+
+ when(jobManager.listJobTemplates(metalakeName))
+ .thenReturn(Collections.singletonList(obsoleteEntity));
+ doReturn(true).when(jobManager).deleteJobTemplate(metalakeName,
"builtin-obsolete");
+
+ listener.reconcileBuiltInJobTemplates(metalakeName, builtInTemplates);
+
+ // Should delete obsolete template
+ verify(jobManager, times(1)).deleteJobTemplate(metalakeName,
"builtin-obsolete");
+ }
+
+ @Test
+ public void testReconcileHandlesRegistrationException() {
+ String metalakeName = "test_metalake";
+ Map<String, JobTemplate> builtInTemplates = new HashMap<>();
+
+ ShellJobTemplate template =
+ ShellJobTemplate.builder()
+ .withName("builtin-new")
+ .withComment("new")
+ .withExecutable("/bin/echo")
+ .withCustomFields(Collections.singletonMap("version", "v1"))
+ .build();
+ builtInTemplates.put("builtin-new", template);
+
+
when(jobManager.listJobTemplates(metalakeName)).thenReturn(Collections.emptyList());
+ Mockito.doThrow(new JobTemplateAlreadyExistsException("Already exists"))
+ .when(jobManager)
+ .registerJobTemplate(eq(metalakeName), any());
+
+ // Should not throw exception
+ listener.reconcileBuiltInJobTemplates(metalakeName, builtInTemplates);
+ }
+
+ private void createMetalake(String name, boolean inUse) throws IOException {
+ Map<String, String> props = new HashMap<>();
+ props.put(PROPERTY_IN_USE, String.valueOf(inUse));
+
+ BaseMetalake metalake =
+ BaseMetalake.builder()
+ .withId(idGenerator.nextId())
+ .withName(name)
+ .withComment("test metalake")
+ .withProperties(props)
+ .withVersion(SchemaVersion.V_0_1)
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+
+ entityStore.put(metalake, false);
+ }
+
+ private JobTemplateEntity createJobTemplateEntity(String name, String
version) {
+ ShellJobTemplate template =
+ ShellJobTemplate.builder()
+ .withName(name)
+ .withComment("test")
+ .withExecutable("/bin/echo")
+ .withCustomFields(Collections.singletonMap("version", version))
+ .build();
+
+ return JobTemplateEntity.builder()
+ .withId(1L)
+ .withName(name)
+ .withNamespace(NamespaceUtil.ofJobTemplate("test_metalake"))
+
.withTemplateContent(JobTemplateEntity.TemplateContent.fromJobTemplate(template))
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+ }
+}
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
index 4e669bec79..48f15f2b67 100644
--- a/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobManager.java
@@ -576,6 +576,12 @@ public class TestJobManager {
.build();
when(entityStore.list(Namespace.empty(), BaseMetalake.class,
Entity.EntityType.METALAKE))
.thenReturn(ImmutableList.of(mockMetalake));
+
+ // Mock MetalakeManager.listInUseMetalakes to return the test metalake
+ mockedMetalake
+ .when(() -> MetalakeManager.listInUseMetalakes(entityStore))
+ .thenReturn(ImmutableList.of(metalake));
+
when(jobManager.listJobs(metalake,
Optional.empty())).thenReturn(ImmutableList.of(job));
when(jobExecutor.getJobStatus(job.jobExecutionId())).thenReturn(JobHandle.Status.QUEUED);
@@ -600,6 +606,11 @@ public class TestJobManager {
when(entityStore.list(Namespace.empty(), BaseMetalake.class,
Entity.EntityType.METALAKE))
.thenReturn(ImmutableList.of(mockMetalake));
+ // Mock MetalakeManager.listInUseMetalakes to return the test metalake
+ mockedMetalake
+ .when(() -> MetalakeManager.listInUseMetalakes(entityStore))
+ .thenReturn(ImmutableList.of(metalake));
+
when(jobManager.listJobs(metalake,
Optional.empty())).thenReturn(ImmutableList.of(job));
Assertions.assertDoesNotThrow(() -> jobManager.cleanUpStagingDirs());
verify(entityStore, never()).delete(any(), any());
diff --git
a/core/src/test/java/org/apache/gravitino/job/TestJobTemplateValidationDispatcher.java
b/core/src/test/java/org/apache/gravitino/job/TestJobTemplateValidationDispatcher.java
new file mode 100644
index 0000000000..0f27df37cb
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/job/TestJobTemplateValidationDispatcher.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.job;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.IllegalJobTemplateOperationException;
+import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.JobTemplateEntity;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestJobTemplateValidationDispatcher {
+
+ private JobOperationDispatcher mockDispatcher;
+ private JobTemplateValidationDispatcher validationDispatcher;
+
+ @BeforeEach
+ public void setUp() {
+ mockDispatcher = mock(JobOperationDispatcher.class);
+ validationDispatcher = new JobTemplateValidationDispatcher(mockDispatcher);
+ }
+
+ @Test
+ public void testIsBuiltInTemplateName() {
+ // Test built-in template names
+
assertTrue(JobTemplateValidationDispatcher.isBuiltInTemplateName("builtin-spark-pi"));
+
assertTrue(JobTemplateValidationDispatcher.isBuiltInTemplateName("builtin-test"));
+
assertTrue(JobTemplateValidationDispatcher.isBuiltInTemplateName("builtin-"));
+
+ // Test non-built-in template names
+
assertFalse(JobTemplateValidationDispatcher.isBuiltInTemplateName("my-template"));
+
assertFalse(JobTemplateValidationDispatcher.isBuiltInTemplateName("user-template"));
+
assertFalse(JobTemplateValidationDispatcher.isBuiltInTemplateName("BUILTIN-test"));
+ assertFalse(JobTemplateValidationDispatcher.isBuiltInTemplateName(""));
+ assertFalse(JobTemplateValidationDispatcher.isBuiltInTemplateName(null));
+ }
+
+ @Test
+ public void testRegisterJobTemplateWithBuiltInName() {
+ JobTemplateEntity entity =
createJobTemplateEntity("builtin-test-template");
+
+ IllegalJobTemplateOperationException exception =
+ assertThrows(
+ IllegalJobTemplateOperationException.class,
+ () -> validationDispatcher.registerJobTemplate("metalake1",
entity));
+
+ assertTrue(exception.getMessage().contains("builtin-test-template"));
+ assertTrue(exception.getMessage().contains("reserved for built-in
templates"));
+ verify(mockDispatcher, never()).registerJobTemplate(anyString(), any());
+ }
+
+ @Test
+ public void testRegisterJobTemplateWithUserName() throws
JobTemplateAlreadyExistsException {
+ JobTemplateEntity entity = createJobTemplateEntity("my-custom-template");
+
+ doNothing().when(mockDispatcher).registerJobTemplate(eq("metalake1"),
eq(entity));
+
+ assertDoesNotThrow(() ->
validationDispatcher.registerJobTemplate("metalake1", entity));
+
+ verify(mockDispatcher).registerJobTemplate("metalake1", entity);
+ }
+
+ @Test
+ public void testDeleteBuiltInJobTemplate() {
+ IllegalJobTemplateOperationException exception =
+ assertThrows(
+ IllegalJobTemplateOperationException.class,
+ () -> validationDispatcher.deleteJobTemplate("metalake1",
"builtin-spark-pi"));
+
+ assertTrue(exception.getMessage().contains("Cannot delete"));
+ assertTrue(exception.getMessage().contains("builtin-spark-pi"));
+ assertTrue(exception.getMessage().contains("managed by the system"));
+ verify(mockDispatcher, never()).deleteJobTemplate(anyString(),
anyString());
+ }
+
+ @Test
+ public void testDeleteUserJobTemplate() {
+ when(mockDispatcher.deleteJobTemplate("metalake1",
"my-template")).thenReturn(true);
+
+ boolean result = validationDispatcher.deleteJobTemplate("metalake1",
"my-template");
+
+ assertTrue(result);
+ verify(mockDispatcher).deleteJobTemplate("metalake1", "my-template");
+ }
+
+ @Test
+ public void testAlterBuiltInJobTemplate() {
+ JobTemplateChange change = JobTemplateChange.updateComment("new comment");
+
+ IllegalJobTemplateOperationException exception =
+ assertThrows(
+ IllegalJobTemplateOperationException.class,
+ () -> validationDispatcher.alterJobTemplate("metalake1",
"builtin-spark-pi", change));
+
+ assertTrue(exception.getMessage().contains("Cannot alter"));
+ assertTrue(exception.getMessage().contains("builtin-spark-pi"));
+ verify(mockDispatcher, never())
+ .alterJobTemplate(anyString(), anyString(),
any(JobTemplateChange[].class));
+ }
+
+ @Test
+ public void testAlterUserJobTemplate() {
+ JobTemplateEntity entity = createJobTemplateEntity("my-template");
+ JobTemplateChange change = JobTemplateChange.updateComment("new comment");
+
+ when(mockDispatcher.alterJobTemplate("metalake1", "my-template",
change)).thenReturn(entity);
+
+ JobTemplateEntity result =
+ validationDispatcher.alterJobTemplate("metalake1", "my-template",
change);
+
+ assertEquals(entity, result);
+ verify(mockDispatcher).alterJobTemplate("metalake1", "my-template",
change);
+ }
+
+ @Test
+ public void testRenameToBuiltInName() {
+ JobTemplateChange change = JobTemplateChange.rename("builtin-new-name");
+
+ IllegalJobTemplateOperationException exception =
+ assertThrows(
+ IllegalJobTemplateOperationException.class,
+ () -> validationDispatcher.alterJobTemplate("metalake1",
"my-template", change));
+
+ assertTrue(exception.getMessage().contains("builtin-new-name"));
+ assertTrue(exception.getMessage().contains("reserved for built-in
templates"));
+ verify(mockDispatcher, never())
+ .alterJobTemplate(anyString(), anyString(),
any(JobTemplateChange[].class));
+ }
+
+ @Test
+ public void testRenameToUserName() {
+ JobTemplateEntity entity = createJobTemplateEntity("new-user-name");
+ JobTemplateChange change = JobTemplateChange.rename("new-user-name");
+
+ when(mockDispatcher.alterJobTemplate("metalake1", "old-name",
change)).thenReturn(entity);
+
+ JobTemplateEntity result =
+ validationDispatcher.alterJobTemplate("metalake1", "old-name", change);
+
+ assertEquals(entity, result);
+ verify(mockDispatcher).alterJobTemplate("metalake1", "old-name", change);
+ }
+
+ @Test
+ public void testMultipleChangesWithBuiltInRename() {
+ JobTemplateChange change1 = JobTemplateChange.updateComment("new comment");
+ JobTemplateChange change2 = JobTemplateChange.rename("builtin-new-name");
+
+ IllegalJobTemplateOperationException exception =
+ assertThrows(
+ IllegalJobTemplateOperationException.class,
+ () ->
+ validationDispatcher.alterJobTemplate(
+ "metalake1", "my-template", change1, change2));
+
+ assertTrue(exception.getMessage().contains("builtin-new-name"));
+ verify(mockDispatcher, never())
+ .alterJobTemplate(anyString(), anyString(),
any(JobTemplateChange[].class));
+ }
+
+ @Test
+ public void testListJobTemplates() {
+ validationDispatcher.listJobTemplates("metalake1");
+ verify(mockDispatcher).listJobTemplates("metalake1");
+ }
+
+ @Test
+ public void testGetJobTemplate() {
+ validationDispatcher.getJobTemplate("metalake1", "any-template");
+ verify(mockDispatcher).getJobTemplate("metalake1", "any-template");
+ }
+
+ @Test
+ public void testListJobs() {
+ validationDispatcher.listJobs("metalake1", Optional.empty());
+ verify(mockDispatcher).listJobs("metalake1", Optional.empty());
+ }
+
+ @Test
+ public void testGetJob() {
+ validationDispatcher.getJob("metalake1", "job-123");
+ verify(mockDispatcher).getJob("metalake1", "job-123");
+ }
+
+ @Test
+ public void testRunJob() {
+ validationDispatcher.runJob("metalake1", "my-template",
Collections.emptyMap());
+ verify(mockDispatcher).runJob("metalake1", "my-template",
Collections.emptyMap());
+ }
+
+ @Test
+ public void testCancelJob() {
+ validationDispatcher.cancelJob("metalake1", "job-123");
+ verify(mockDispatcher).cancelJob("metalake1", "job-123");
+ }
+
+ private JobTemplateEntity createJobTemplateEntity(String name) {
+ return JobTemplateEntity.builder()
+ .withId(1L)
+ .withName(name)
+ .withNamespace(Namespace.of("metalake1"))
+ .withComment("test template")
+ .withTemplateContent(
+ JobTemplateEntity.TemplateContent.builder()
+ .withJobType(JobTemplate.JobType.SPARK)
+ .withExecutable("spark-submit")
+ .withArguments(Collections.emptyList())
+ .withEnvironments(Collections.emptyMap())
+ .withCustomFields(Collections.emptyMap())
+ .withClassName("com.example.Main")
+ .withJars(Collections.emptyList())
+ .withFiles(Collections.emptyList())
+ .withArchives(Collections.emptyList())
+ .withConfigs(Collections.emptyMap())
+ .build())
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java
b/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java
index 03624f28da..4a91539d4f 100644
--- a/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java
+++ b/core/src/test/java/org/apache/gravitino/metalake/TestMetalakeManager.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.doReturn;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.gravitino.Config;
@@ -209,6 +210,35 @@ public class TestMetalakeManager {
Assertions.assertFalse(dropped1, "metalake should be non-existent");
}
+ @Test
+ public void testListInUseMetalakes() {
+ // Create some metalakes with different in-use status
+ NameIdentifier ident1 = NameIdentifier.of("metalake1");
+ NameIdentifier ident2 = NameIdentifier.of("metalake2");
+ NameIdentifier ident3 = NameIdentifier.of("metalake3");
+
+ // Create metalakes - by default they should be in-use (PROPERTY_IN_USE
defaults to true)
+ metalakeManager.createMetalake(ident1, "comment1", ImmutableMap.of());
+ metalakeManager.createMetalake(ident2, "comment2", ImmutableMap.of());
+ metalakeManager.createMetalake(ident3, "comment3", ImmutableMap.of());
+
+ // Disable metalake2
+ metalakeManager.disableMetalake(ident2);
+
+ // List in-use metalakes
+ List<String> inUseMetalakes =
MetalakeManager.listInUseMetalakes(entityStore);
+
+ // Should contain metalake1 and metalake3, but not metalake2
+ Assertions.assertTrue(inUseMetalakes.contains("metalake1"));
+ Assertions.assertFalse(inUseMetalakes.contains("metalake2"));
+ Assertions.assertTrue(inUseMetalakes.contains("metalake3"));
+
+ // Cleanup
+ metalakeManager.dropMetalake(ident1, true);
+ metalakeManager.dropMetalake(ident2, true);
+ metalakeManager.dropMetalake(ident3, true);
+ }
+
private void testProperties(Map<String, String> expectedProps, Map<String,
String> testProps) {
expectedProps.forEach(
(k, v) -> {
diff --git a/maintenance/jobs/build.gradle.kts
b/maintenance/jobs/build.gradle.kts
new file mode 100644
index 0000000000..9a0266f7d7
--- /dev/null
+++ b/maintenance/jobs/build.gradle.kts
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
+
+plugins {
+ `maven-publish`
+ id("java")
+ alias(libs.plugins.shadow)
+}
+
+repositories {
+ mavenCentral()
+}
+
+val scalaVersion: String = project.properties["scalaVersion"] as? String ?:
extra["defaultScalaVersion"].toString()
+val sparkVersion: String = libs.versions.spark35.get()
+
+dependencies {
+ compileOnly(project(":api"))
+
+ compileOnly(libs.slf4j.api)
+ compileOnly("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") {
+ exclude("org.slf4j")
+ exclude("org.apache.logging.log4j")
+ }
+
+ testImplementation(project(":api"))
+ testImplementation(libs.bundles.log4j)
+ testImplementation(libs.junit.jupiter.api)
+ testRuntimeOnly(libs.junit.jupiter.engine)
+}
+
+tasks.test {
+ useJUnitPlatform()
+}
+
+tasks.withType(ShadowJar::class.java) {
+ isZip64 = true
+ archiveClassifier.set("")
+ mergeServiceFiles()
+
+ dependencies {
+ exclude(dependency("org.apache.spark:.*"))
+ exclude(dependency("org.slf4j:slf4j-api"))
+ exclude(dependency("org.apache.logging.log4j:.*"))
+ }
+}
+
+tasks.jar {
+ dependsOn(tasks.named("shadowJar"))
+ archiveClassifier.set("empty")
+}
diff --git
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJob.java
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJob.java
new file mode 100644
index 0000000000..9f8bb59956
--- /dev/null
+++
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJob.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs;
+
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.CodeSource;
+import org.apache.gravitino.job.JobTemplate;
+
+/** Contract for built-in jobs to expose their job template definitions. */
+public interface BuiltInJob {
+
+ /** Returns the built-in job template provided by this job. */
+ JobTemplate jobTemplate();
+
+ /** Resolve the executable jar that hosts the built-in jobs. */
+ default String resolveExecutable(Class<?> cls) {
+ try {
+ CodeSource codeSource = cls.getProtectionDomain().getCodeSource();
+ if (codeSource == null || codeSource.getLocation() == null) {
+ throw new RuntimeException("Failed to resolve built-in jobs jar
location");
+ }
+
+ Path path = Paths.get(codeSource.getLocation().toURI());
+ return path.toAbsolutePath().toString();
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Failed to resolve built-in jobs jar
location", e);
+ }
+ }
+}
diff --git
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java
new file mode 100644
index 0000000000..3758fe8d26
--- /dev/null
+++
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.maintenance.jobs.spark.SparkPiJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Provides built-in job templates bundled with the Gravitino jobs module. */
+public class BuiltInJobTemplateProvider implements JobTemplateProvider {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BuiltInJobTemplateProvider.class);
+
+ private static final Pattern NAME_PATTERN =
+ Pattern.compile(JobTemplateProvider.BUILTIN_NAME_PATTERN);
+ private static final Pattern VERSION_PATTERN =
+ Pattern.compile(JobTemplateProvider.VERSION_VALUE_PATTERN);
+
+ private static final List<BuiltInJob> BUILT_IN_JOBS = ImmutableList.of(new
SparkPiJob());
+
+ @Override
+ public List<? extends JobTemplate> jobTemplates() {
+ return BUILT_IN_JOBS.stream()
+ .map(BuiltInJob::jobTemplate)
+ .filter(this::isValid)
+ .collect(Collectors.toList());
+ }
+
+ private boolean isValid(JobTemplate template) {
+ if (!NAME_PATTERN.matcher(template.name()).matches()) {
+ LOG.warn("Skip built-in job template with illegal name: {}",
template.name());
+ return false;
+ }
+
+ Optional<String> version =
+ Optional.ofNullable(template.customFields())
+ .map(fields ->
fields.get(JobTemplateProvider.PROPERTY_VERSION_KEY));
+ if (version.isEmpty() ||
!VERSION_PATTERN.matcher(version.get()).matches()) {
+ LOG.warn("Skip built-in job template {} without valid version",
template.name());
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/spark/SparkPiJob.java
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/spark/SparkPiJob.java
new file mode 100644
index 0000000000..6ff35181ea
--- /dev/null
+++
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/spark/SparkPiJob.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.spark;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.maintenance.jobs.BuiltInJob;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Self-contained Spark Pi program for the built-in SparkPi job template.
+ *
+ * <p>This avoids depending on the Spark examples jar so the built-in template
can run with only the
+ * Gravitino-provided jobs artifact on the classpath.
+ */
+public class SparkPiJob implements BuiltInJob {
+
+ private static final String NAME = JobTemplateProvider.BUILTIN_NAME_PREFIX +
"sparkpi";
+ // Bump VERSION whenever SparkPi template behavior changes
(name/executable/class/args/configs).
+ private static final String VERSION = "v1";
+
+ @Override
+ public SparkJobTemplate jobTemplate() {
+ return SparkJobTemplate.builder()
+ .withName(NAME)
+ .withComment("Built-in SparkPi job template")
+ .withExecutable(resolveExecutable(SparkPiJob.class))
+ .withClassName(SparkPiJob.class.getName())
+ .withArguments(Collections.singletonList("{{slices}}"))
+ .withConfigs(buildSparkConfigs())
+ .withCustomFields(
+ Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY,
VERSION))
+ .build();
+ }
+
+ public static void main(String[] args) {
+ int slices = 2;
+ if (args.length > 0) {
+ try {
+ slices = Integer.parseInt(args[0]);
+ } catch (NumberFormatException e) {
+ System.err.println("Invalid number of slices provided. Using default
value of 2.");
+ }
+ }
+
+ int samples = Math.max(slices, 1) * 100000;
+
+ SparkSession spark =
+ SparkSession.builder()
+ .appName("Gravitino Built-in SparkPi")
+ // Rely on external cluster/master configuration
+ .getOrCreate();
+
+ JavaSparkContext jsc =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+ JavaRDD<Integer> rdd =
+ jsc.parallelize(IntStream.range(0,
samples).boxed().collect(Collectors.toList()), slices);
+ long count =
+ rdd.filter(
+ i -> {
+ double x = Math.random() * 2 - 1;
+ double y = Math.random() * 2 - 1;
+ return x * x + y * y <= 1;
+ })
+ .count();
+
+ double pi = 4.0 * count / samples;
+ System.out.printf("Pi is roughly %.5f%n", pi);
+
+ spark.stop();
+ }
+
+ private Map<String, String> buildSparkConfigs() {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("spark.master", "{{spark_master}}");
+ configs.put("spark.executor.instances", "{{spark_executor_instances}}");
+ configs.put("spark.executor.cores", "{{spark_executor_cores}}");
+ configs.put("spark.executor.memory", "{{spark_executor_memory}}");
+ configs.put("spark.driver.memory", "{{spark_driver_memory}}");
+ return Collections.unmodifiableMap(configs);
+ }
+}
diff --git
a/maintenance/jobs/src/main/resources/META-INF/services/org.apache.gravitino.job.JobTemplateProvider
b/maintenance/jobs/src/main/resources/META-INF/services/org.apache.gravitino.job.JobTemplateProvider
new file mode 100644
index 0000000000..7b812fb9cd
--- /dev/null
+++
b/maintenance/jobs/src/main/resources/META-INF/services/org.apache.gravitino.job.JobTemplateProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.gravitino.maintenance.jobs.BuiltInJobTemplateProvider
diff --git
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/TestBuiltInJob.java
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/TestBuiltInJob.java
new file mode 100644
index 0000000000..1660698b06
--- /dev/null
+++
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/TestBuiltInJob.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.gravitino.job.JobTemplate;
+import org.junit.jupiter.api.Test;
+
+public class TestBuiltInJob {
+
+ private static class TestBuiltInJobImpl implements BuiltInJob {
+ @Override
+ public JobTemplate jobTemplate() {
+ return null; // Not needed for this test
+ }
+ }
+
+ @Test
+ public void testResolveExecutableReturnsValidPath() {
+ BuiltInJob job = new TestBuiltInJobImpl();
+ String executable = job.resolveExecutable(TestBuiltInJobImpl.class);
+
+ assertNotNull(executable, "Executable path should not be null");
+ assertFalse(executable.trim().isEmpty(), "Executable path should not be
empty");
+ }
+
+ @Test
+ public void testResolveExecutablePointsToJarOrClassDirectory() {
+ BuiltInJob job = new TestBuiltInJobImpl();
+ String executable = job.resolveExecutable(TestBuiltInJobImpl.class);
+
+ // Should end with .jar or be a directory path containing test classes
+ assertTrue(
+ executable.endsWith(".jar")
+ || executable.contains("test-classes")
+ || executable.contains("classes"),
+ "Executable should point to a jar file or classes directory: " +
executable);
+ }
+}
diff --git
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/TestBuiltInJobTemplateProvider.java
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/TestBuiltInJobTemplateProvider.java
new file mode 100644
index 0000000000..f2cf0f6937
--- /dev/null
+++
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/TestBuiltInJobTemplateProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import org.apache.gravitino.job.JobTemplate;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.junit.jupiter.api.Test;
+
+public class TestBuiltInJobTemplateProvider {
+
+ @Test
+ public void testJobTemplatesReturnsNonEmptyList() {
+ BuiltInJobTemplateProvider provider = new BuiltInJobTemplateProvider();
+ List<? extends JobTemplate> templates = provider.jobTemplates();
+
+ assertNotNull(templates, "Job templates list should not be null");
+ assertFalse(templates.isEmpty(), "Should provide at least one built-in job
template");
+ }
+
+ @Test
+ public void testAllTemplatesHaveBuiltInPrefix() {
+ BuiltInJobTemplateProvider provider = new BuiltInJobTemplateProvider();
+ List<? extends JobTemplate> templates = provider.jobTemplates();
+
+ for (JobTemplate template : templates) {
+ assertTrue(
+ template.name().startsWith(JobTemplateProvider.BUILTIN_NAME_PREFIX),
+ "Template name should start with builtin- prefix: " +
template.name());
+ }
+ }
+
+ @Test
+ public void testAllTemplatesHaveValidVersion() {
+ BuiltInJobTemplateProvider provider = new BuiltInJobTemplateProvider();
+ List<? extends JobTemplate> templates = provider.jobTemplates();
+
+ for (JobTemplate template : templates) {
+ String version =
template.customFields().get(JobTemplateProvider.PROPERTY_VERSION_KEY);
+ assertNotNull(version, "Template should have version property: " +
template.name());
+ assertTrue(
+ version.matches(JobTemplateProvider.VERSION_VALUE_PATTERN),
+ "Version should match pattern v\\d+: " + version);
+ }
+ }
+}
diff --git
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/spark/TestSparkPiJob.java
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/spark/TestSparkPiJob.java
new file mode 100644
index 0000000000..e2afbd2115
--- /dev/null
+++
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/spark/TestSparkPiJob.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.spark;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.junit.jupiter.api.Test;
+
+public class TestSparkPiJob {
+
+ @Test
+ public void testJobTemplateHasCorrectName() {
+ SparkPiJob job = new SparkPiJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+ assertNotNull(template);
+ assertEquals("builtin-sparkpi", template.name());
+ }
+
+ @Test
+ public void testJobTemplateHasComment() {
+ SparkPiJob job = new SparkPiJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+ assertNotNull(template.comment());
+ assertFalse(template.comment().trim().isEmpty());
+ }
+
+ @Test
+ public void testJobTemplateHasExecutable() {
+ SparkPiJob job = new SparkPiJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+ assertNotNull(template.executable());
+ assertFalse(template.executable().trim().isEmpty());
+ }
+
+ @Test
+ public void testJobTemplateHasMainClass() {
+ SparkPiJob job = new SparkPiJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+ assertNotNull(template.className());
+ assertEquals(SparkPiJob.class.getName(), template.className());
+ }
+
+ @Test
+ public void testJobTemplateHasArguments() {
+ SparkPiJob job = new SparkPiJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+ assertNotNull(template.arguments());
+ assertEquals(1, template.arguments().size());
+ assertEquals("{{slices}}", template.arguments().get(0));
+ }
+
+ @Test
+ public void testJobTemplateHasSparkConfigs() {
+ SparkPiJob job = new SparkPiJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+ Map<String, String> configs = template.configs();
+ assertNotNull(configs);
+ assertFalse(configs.isEmpty());
+
+ // Verify expected config keys with placeholders
+ assertTrue(configs.containsKey("spark.master"));
+ assertTrue(configs.containsKey("spark.executor.instances"));
+ assertTrue(configs.containsKey("spark.executor.cores"));
+ assertTrue(configs.containsKey("spark.executor.memory"));
+ assertTrue(configs.containsKey("spark.driver.memory"));
+
+ // Verify placeholders
+ assertEquals("{{spark_master}}", configs.get("spark.master"));
+ assertEquals("{{spark_executor_instances}}",
configs.get("spark.executor.instances"));
+ assertEquals("{{spark_executor_cores}}",
configs.get("spark.executor.cores"));
+ assertEquals("{{spark_executor_memory}}",
configs.get("spark.executor.memory"));
+ assertEquals("{{spark_driver_memory}}",
configs.get("spark.driver.memory"));
+ }
+
+ @Test
+ public void testJobTemplateHasVersion() {
+ SparkPiJob job = new SparkPiJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+ Map<String, String> customFields = template.customFields();
+ assertNotNull(customFields);
+
assertTrue(customFields.containsKey(JobTemplateProvider.PROPERTY_VERSION_KEY));
+
+ String version =
customFields.get(JobTemplateProvider.PROPERTY_VERSION_KEY);
+ assertEquals("v1", version);
+ assertTrue(version.matches(JobTemplateProvider.VERSION_VALUE_PATTERN));
+ }
+
+ @Test
+ public void testJobTemplateNameMatchesBuiltInPattern() {
+ SparkPiJob job = new SparkPiJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+
assertTrue(template.name().matches(JobTemplateProvider.BUILTIN_NAME_PATTERN));
+
assertTrue(template.name().startsWith(JobTemplateProvider.BUILTIN_NAME_PREFIX));
+ }
+
+ @Test
+ public void testResolveExecutableReturnsNonEmptyPath() {
+ SparkPiJob job = new SparkPiJob();
+ String executable = job.jobTemplate().executable();
+
+ assertNotNull(executable);
+ assertFalse(executable.trim().isEmpty());
+ }
+}
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
index d3ddc1087f..a4476d6a7b 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/ExceptionHandlers.java
@@ -28,6 +28,7 @@ import
org.apache.gravitino.exceptions.ConnectionFailedException;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.ForbiddenException;
import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
+import org.apache.gravitino.exceptions.IllegalJobTemplateOperationException;
import org.apache.gravitino.exceptions.InUseException;
import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException;
import org.apache.gravitino.exceptions.MetalakeAlreadyExistsException;
@@ -859,7 +860,10 @@ public class ExceptionHandlers {
String errorMsg = getJobTemplateErrorMsg(formatted, op.name(), parent,
getErrorMsg(e));
LOG.warn(errorMsg, e);
- if (e instanceof IllegalArgumentException) {
+ if (e instanceof IllegalJobTemplateOperationException) {
+ return Utils.illegalArguments(errorMsg, e);
+
+ } else if (e instanceof IllegalArgumentException) {
return Utils.illegalArguments(errorMsg, e);
} else if (e instanceof NotFoundException) {
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 1be14c8e1d..75dab2bc0c 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -88,4 +88,4 @@ include(":bundles:azure", ":bundles:azure-bundle",
":bundles:iceberg-azure-bundl
include(":catalogs:hadoop-common")
include(":lineage")
include(":mcp-server")
-include(":maintenance:optimizer")
+include(":maintenance:optimizer", ":maintenance:jobs")