This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch branch-1.0
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-1.0 by this push:
new d9f26fcc6b [#8865] improve(core): Add the placeholder support for all
the job template fields (#8936)
d9f26fcc6b is described below
commit d9f26fcc6bd91c9ba9e93a1bb3ad0d9bea0123ab
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 28 20:43:03 2025 +0800
[#8865] improve(core): Add the placeholder support for all the job template
fields (#8936)
### What changes were proposed in this pull request?
This PR adds the placeholder support for all the job template fields to
give users the flexibility to control.
### Why are the changes needed?
To support multi-version job artifacts.
Fix: #8865
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UTs been added.
Co-authored-by: Jerry Shao <[email protected]>
Co-authored-by: Mini Yu <[email protected]>
---
.../java/org/apache/gravitino/job/JobManager.java | 40 +++++-
.../org/apache/gravitino/job/TestJobTemplate.java | 147 +++++++++++++++++++++
docs/manage-jobs-in-gravitino.md | 45 +++----
3 files changed, 199 insertions(+), 33 deletions(-)
diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index 61033b763d..b6f255ae67 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -671,7 +671,9 @@ public class JobManager implements JobOperationDispatcher {
String comment = jobTemplateEntity.comment();
JobTemplateEntity.TemplateContent content =
jobTemplateEntity.templateContent();
- String executable = fetchFileFromUri(content.executable(), stagingDir,
TIMEOUT_IN_MS);
+ String executable =
+ fetchFileFromUri(
+ replacePlaceholder(content.executable(), jobConf), stagingDir,
TIMEOUT_IN_MS);
List<String> args =
content.arguments().stream()
@@ -692,7 +694,13 @@ public class JobManager implements JobOperationDispatcher {
// For shell job template
if (content.jobType() == JobTemplate.JobType.SHELL) {
- List<String> scripts = fetchFilesFromUri(content.scripts(), stagingDir,
TIMEOUT_IN_MS);
+ List<String> scripts =
+ content.scripts().stream()
+ .map(
+ script ->
+ fetchFileFromUri(
+ replacePlaceholder(script, jobConf), stagingDir,
TIMEOUT_IN_MS))
+ .collect(Collectors.toList());
return ShellJobTemplate.builder()
.withName(name)
@@ -707,10 +715,30 @@ public class JobManager implements JobOperationDispatcher
{
// For Spark job template
if (content.jobType() == JobTemplate.JobType.SPARK) {
- String className = content.className();
- List<String> jars = fetchFilesFromUri(content.jars(), stagingDir,
TIMEOUT_IN_MS);
- List<String> files = fetchFilesFromUri(content.files(), stagingDir,
TIMEOUT_IN_MS);
- List<String> archives = fetchFilesFromUri(content.archives(),
stagingDir, TIMEOUT_IN_MS);
+ String className = replacePlaceholder(content.className(), jobConf);
+ List<String> jars =
+ content.jars().stream()
+ .map(
+ jar ->
+ fetchFileFromUri(replacePlaceholder(jar, jobConf),
stagingDir, TIMEOUT_IN_MS))
+ .collect(Collectors.toList());
+
+ List<String> files =
+ content.files().stream()
+ .map(
+ file ->
+ fetchFileFromUri(
+ replacePlaceholder(file, jobConf), stagingDir,
TIMEOUT_IN_MS))
+ .collect(Collectors.toList());
+
+ List<String> archives =
+ content.archives().stream()
+ .map(
+ archive ->
+ fetchFileFromUri(
+ replacePlaceholder(archive, jobConf), stagingDir,
TIMEOUT_IN_MS))
+ .collect(Collectors.toList());
+
Map<String, String> configs =
content.configs().entrySet().stream()
.collect(
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobTemplate.java
b/core/src/test/java/org/apache/gravitino/job/TestJobTemplate.java
index 7c53f4ec4c..3ce6d1b87c 100644
--- a/core/src/test/java/org/apache/gravitino/job/TestJobTemplate.java
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobTemplate.java
@@ -222,6 +222,61 @@ public class TestJobTemplate {
Assertions.assertTrue(scriptNames.contains(testScript2.getName()));
}
+ @Test
+ public void testCreateShellRuntimeJobTemplateWithReplacementsInScripts()
throws IOException {
+ File testScript1 = Files.createTempFile(tempDir.toPath(), "testScript1",
".sh").toFile();
+ File testScript2 = Files.createTempFile(tempDir.toPath(), "testScript2",
".sh").toFile();
+
+ ShellJobTemplate shellJobTemplate =
+ ShellJobTemplate.builder()
+ .withName("testShellJob1")
+ .withComment("This is a test shell job template")
+ .withExecutable("/bin/echo")
+ .withArguments(Lists.newArrayList("arg1", "arg2", "{{arg3}},
{{arg4}}"))
+ .withEnvironments(ImmutableMap.of("ENV_VAR1", "{{val1}}",
"ENV_VAR2", "{{val2}}"))
+ .withCustomFields(ImmutableMap.of("customField1",
"{{customVal1}}"))
+ .withScripts(
+ Lists.newArrayList(
+ testScript1.toURI().toString().replace("testScript1",
"{{scriptName1}}"),
+ testScript2.toURI().toString().replace("testScript2",
"{{scriptName2}}")))
+ .build();
+
+ JobTemplateEntity entity =
+ JobTemplateEntity.builder()
+ .withId(1L)
+ .withName(shellJobTemplate.name())
+ .withComment(shellJobTemplate.comment())
+ .withNamespace(NamespaceUtil.ofJobTemplate("test"))
+ .withTemplateContent(
+
JobTemplateEntity.TemplateContent.fromJobTemplate(shellJobTemplate))
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+
+ JobTemplate result =
+ JobManager.createRuntimeJobTemplate(
+ entity,
+ ImmutableMap.of(
+ "arg3", "value3",
+ "arg4", "value4",
+ "val1", "value1",
+ "val2", "value2",
+ "customVal1", "customValue1",
+ "scriptName1", "testScript1",
+ "scriptName2", "testScript2"),
+ tempStagingDir);
+
+ Assertions.assertEquals("echo", new File(result.executable).getName());
+ Assertions.assertEquals(2, ((ShellJobTemplate) result).scripts().size());
+ List<String> scriptNames =
+ ((ShellJobTemplate) result)
+ .scripts().stream()
+ .map(script -> new File(script).getName())
+ .collect(Collectors.toList());
+ Assertions.assertTrue(scriptNames.contains(testScript1.getName()));
+ Assertions.assertTrue(scriptNames.contains(testScript2.getName()));
+ }
+
@Test
public void testCreateSparkRuntimeJobTemplate() throws IOException {
File executable = Files.createTempFile(tempDir.toPath(), "testSparkJob",
".jar").toFile();
@@ -311,4 +366,96 @@ public class TestJobTemplate {
ImmutableMap.of("spark.executor.memory", "4g", "spark.driver.cores",
"2"),
((SparkJobTemplate) result).configs());
}
+
+ @Test
+ public void testCreateSparkRuntimeJobTemplateWithReplacements() throws
IOException {
+ File executable = Files.createTempFile(tempDir.toPath(), "testSparkJob",
".jar").toFile();
+ File jar1 = Files.createTempFile(tempDir.toPath(), "testJar1",
".jar").toFile();
+ File jar2 = Files.createTempFile(tempDir.toPath(), "testJar2",
".jar").toFile();
+
+ File file1 = Files.createTempFile(tempDir.toPath(), "testFile1",
".txt").toFile();
+ File file2 = Files.createTempFile(tempDir.toPath(), "testFile2",
".txt").toFile();
+
+ File archive1 = Files.createTempFile(tempDir.toPath(), "testArchive1",
".zip").toFile();
+
+ SparkJobTemplate sparkJobTemplate =
+ SparkJobTemplate.builder()
+ .withName("testSparkJob")
+ .withComment("This is a test Spark job template")
+ .withExecutable(executable.toURI().toString().replace("test",
"{{env}}"))
+ .withClassName("org.apache.gravitino.TestSparkJob")
+ .withArguments(Lists.newArrayList("arg1", "arg2", "{{arg3}}"))
+ .withEnvironments(ImmutableMap.of("ENV_VAR1", "{{val1}}",
"ENV_VAR2", "{{val2}}"))
+ .withCustomFields(ImmutableMap.of("customField1",
"{{customVal1}}"))
+ .withJars(
+ Lists.newArrayList(
+ jar1.toURI().toString().replace("test", "{{env}}"),
+ jar2.toURI().toString().replace("test", "{{env}}")))
+ .withFiles(
+ Lists.newArrayList(
+ file1.toURI().toString().replace("test", "{{env}}"),
+ file2.toURI().toString().replace("test", "{{env}}")))
+ .withArchives(
+ Lists.newArrayList(archive1.toURI().toString().replace("test",
"{{env}}")))
+ .withConfigs(
+ ImmutableMap.of(
+ "spark.executor.memory",
+ "{{executor-mem}}",
+ "spark.driver.cores",
+ "{{driver-cores}}"))
+ .build();
+
+ JobTemplateEntity entity =
+ JobTemplateEntity.builder()
+ .withId(1L)
+ .withName(sparkJobTemplate.name())
+ .withComment(sparkJobTemplate.comment())
+ .withNamespace(NamespaceUtil.ofJobTemplate("test"))
+ .withTemplateContent(
+
JobTemplateEntity.TemplateContent.fromJobTemplate(sparkJobTemplate))
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+
+ JobTemplate result =
+ JobManager.createRuntimeJobTemplate(
+ entity,
+ ImmutableMap.of(
+ "arg3", "value3",
+ "val1", "value1",
+ "val2", "value2",
+ "customVal1", "customValue1",
+ "executor-mem", "4g",
+ "driver-cores", "2",
+ "env", "test"),
+ tempStagingDir);
+
+ Assertions.assertEquals(executable.getName(), new
File(result.executable).getName());
+ Assertions.assertEquals(Lists.newArrayList("arg1", "arg2", "value3"),
result.arguments());
+ Assertions.assertEquals(
+ ImmutableMap.of("ENV_VAR1", "value1", "ENV_VAR2", "value2"),
result.environments());
+ Assertions.assertEquals(ImmutableMap.of("customField1", "customValue1"),
result.customFields());
+
+ Assertions.assertEquals(2, ((SparkJobTemplate) result).jars().size());
+ List<String> jarNames =
+ ((SparkJobTemplate) result)
+ .jars().stream().map(jar -> new
File(jar).getName()).collect(Collectors.toList());
+ Assertions.assertTrue(jarNames.contains(jar1.getName()));
+ Assertions.assertTrue(jarNames.contains(jar2.getName()));
+
+ Assertions.assertEquals(2, ((SparkJobTemplate) result).files().size());
+ List<String> fileNames =
+ ((SparkJobTemplate) result)
+ .files().stream().map(file -> new
File(file).getName()).collect(Collectors.toList());
+ Assertions.assertTrue(fileNames.contains(file1.getName()));
+ Assertions.assertTrue(fileNames.contains(file2.getName()));
+
+ Assertions.assertEquals(1, ((SparkJobTemplate) result).archives().size());
+ List<String> archiveNames =
+ ((SparkJobTemplate) result)
+ .archives().stream()
+ .map(archive -> new File(archive).getName())
+ .collect(Collectors.toList());
+ Assertions.assertTrue(archiveNames.contains(archive1.getName()));
+ }
}
diff --git a/docs/manage-jobs-in-gravitino.md b/docs/manage-jobs-in-gravitino.md
index d94ad6d204..9d31215da7 100644
--- a/docs/manage-jobs-in-gravitino.md
+++ b/docs/manage-jobs-in-gravitino.md
@@ -16,12 +16,12 @@ import TabItem from '@theme/TabItem';
Starting from 1.0.0, Apache Gravitino introduces a new submodule called the
job system for users to
register, run, and manage jobs. This job system integrates with the existing
metadata
-management, enabling users to execute the jobs or actions based on the
metadata,
-known as metadata-driven actions. For instance, this allows users to run jobs
for tasks such as
+management, enabling users to execute the jobs or actions based on the
metadata,
+known as metadata-driven actions. For instance, this allows users to run jobs
for tasks such as
compacting Iceberg tables or cleaning old data based on TTL properties.
The aim of the job system is to provide a unified way to manage job templates
and jobs,
-including registering job templates, running jobs based on the job templates,
and other related
+including registering job templates, running jobs based on the job templates,
and other related
tasks. The job system itself is a unified job submitter that allows users to
run jobs through it,
but it doesn't provide the actual job execution capabilities. Instead, it
relies on the
existing job executors (schedulers), such as Apache Airflow, Apache Livy, to
execute the jobs.
@@ -72,13 +72,10 @@ Here is a brief description of the fields in the job
template:
- `jobType`: The type of the job template, use `shell` for a shell job
template.
- `comment`: A comment for the job template, which can be used to describe the
job template.
- `executable`: The path to the executable script, which can be a shell script
or any executable script.
-- `arguments`: The arguments to pass to the executable script, you can use
placeholders like `{{arg1}}`
- and `{{arg2}}` to be replaced with actual values when running the job.
-- `environments`: The environment variables to set when running the job, you
can use placeholders like
- `{{value1}}` and `{{value2}}` to be replaced with actual values when running
the job.
+- `arguments`: The arguments to pass to the executable script.
+- `environments`: The environment variables to set when running the jo.
- `customFields`: Custom fields for the job template, which can be used to
store additional
- information, you can use placeholders like `{{value1}}` and `{{value2}}` to
be replaced with actual
- values when running the job.
+ information.
- `scripts`: A list of scripts that the main executable script can use.
Please note that:
@@ -87,10 +84,9 @@ Please note that:
Gravitino supports accessing files from the local file system, HTTP(S)
URLs, and FTP(S) URLs
(more distributed file system support will be added in the future). So the
`executable` and
`scripts` can be a local file path, or a URL like
`http://example.com/my_script.sh`.
-2. The `arguments`, `environments`, and `customFields` can use placeholders
like `{{arg1}}` and
- `{{value1}}` to be replaced with actual values when running the job. The
placeholders will be
- replaced with the actual values when running the job, so you can use them
to pass dynamic values
- to the job template.
+2. The `executable`, `arguments`, `environments`, `customFields` and `scripts`
can use placeholders
+ like `{{arg1}}` and `{{value1}}`, the style is `{{foo}}`. They will be
replaced with
+ the actual values when running the job, so you can use them to pass dynamic
values to the job template.
3. Gravitino will copy the `executable` and `scripts` files to the job working
directory
when running the job, so you can use the relative path in the `executable`
and `scripts` to
refer to other scripts in the job working directory.
@@ -134,14 +130,10 @@ Here is a brief description of the fields in the Spark
job template:
- `comment`: A comment for the job template, which can be used to describe the
job template.
- `executable`: The path to the Spark application JAR file, which can be a
local file path or a URL
with a supported scheme.
-- `arguments`: The arguments to pass to the Spark application, you can use
placeholders like
- `{{arg1}}` and `{{arg2}}` to be replaced with actual values when running the
job.
-- `environments`: The environment variables to set when running the job, you
can use placeholders like
- `{{value1}}` and `{{value2}}` to be replaced with actual values when running
the job.
+- `arguments`: The arguments to pass to the Spark application.
+- `environments`: The environment variables to set when running the job.
- `customFields`: Custom fields for the job template, which can be used to
store additional information.
- It can use placeholders like `{{value1}}` and `{{value2}}` to be replaced
with actual values
- when running the job.
-- `className`: The main class of the Spark application. It is required for
Java/Scala Spark
+- `className`: The main class of the Spark application. It is required for
Java/Scala Spark
application. For PySpark application, this field can be `null` instead.
- `jars`: A list of JAR files to add to the Spark job classpath, which can be
a local file path or a URL
with a supported scheme.
@@ -149,8 +141,7 @@ Here is a brief description of the fields in the Spark job
template:
file path or a URL with a supported scheme.
- `archives`: A list of archives to be extracted to the working directory of
the Spark job, which
can be a local file path or a URL with a supported scheme.
-- `configs`: A map of Spark configurations to set when running the Spark job,
which can use placeholders
- like `{{value1}}` to be replaced with actual values when running the job.
+- `configs`: A map of Spark configurations to set when running the Spark job.
Note that:
@@ -159,10 +150,10 @@ Note that:
FTP(S) URLs (more distributed file system supports will be added in the
future). So the
`executable`, `jars`, `files`, and `archives` can be a local file path, or
a URL like
`http://example.com/my_spark_app.jar`.
-2. The `arguments`, `environments`, `customFields`, and `configs` can use
placeholders like
- `{{arg1}}` and `{{value1}}` to be replaced with actual values when running
the job. The placeholders
- will be replaced with the actual values when running the job, so you can
use them to pass dynamic
- values to the job template.
+2. The `executable`, `arguments`, `environments`, `customFields`, `className`,
`jars`, `files`,
+ `archives`, and `configs` can use placeholders like `{{arg1}}` and
`{{value1}}`, the style is
+ `{{foo}}`. They will be replaced with the actual values when running the
job, so you can use them
+ to pass dynamic values to the job template.
3. Gravitino will copy the `executable`, `jars`, `files`, and `archives` files
to the job working
directory when running the job, so you can use the relative path in the
`executable`, `jars`,
`files`, and `archives` to refer to other files in the job working
directory.
@@ -440,7 +431,7 @@ placeholders in the job template.
Gravitino leverages the job executor to run the job, so you need to specify
the job executor
through configuration `gravitino.job.executor`. By default, it is set to
"local", which means
-the job will be launched as a process within the same machine that runs the
Gravitino server. Note
+the job will be launched as a process within the same machine that runs the
Gravitino server. Note
that the local job executor is only for testing. If you want to run the job in
a distributed environment,
you need to implement your own `JobExecutor` and set the configuration, please
see
[Implement a custom job executor](#implement-a-custom-job-executor) section
below.