This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch branch-1.0
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-1.0 by this push:
     new d9f26fcc6b [#8865] improve(core): Add the placeholder support for all 
the job template fields (#8936)
d9f26fcc6b is described below

commit d9f26fcc6bd91c9ba9e93a1bb3ad0d9bea0123ab
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 28 20:43:03 2025 +0800

    [#8865] improve(core): Add the placeholder support for all the job template 
fields (#8936)
    
    ### What changes were proposed in this pull request?
    
    This PR adds the placeholder support for all the job template fields to
    give users the flexibility to control.
    
    ### Why are the changes needed?
    
    To support multi-version job artifacts.
    
    Fix: #8865
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UTs been added.
    
    Co-authored-by: Jerry Shao <[email protected]>
    Co-authored-by: Mini Yu <[email protected]>
---
 .../java/org/apache/gravitino/job/JobManager.java  |  40 +++++-
 .../org/apache/gravitino/job/TestJobTemplate.java  | 147 +++++++++++++++++++++
 docs/manage-jobs-in-gravitino.md                   |  45 +++----
 3 files changed, 199 insertions(+), 33 deletions(-)

diff --git a/core/src/main/java/org/apache/gravitino/job/JobManager.java 
b/core/src/main/java/org/apache/gravitino/job/JobManager.java
index 61033b763d..b6f255ae67 100644
--- a/core/src/main/java/org/apache/gravitino/job/JobManager.java
+++ b/core/src/main/java/org/apache/gravitino/job/JobManager.java
@@ -671,7 +671,9 @@ public class JobManager implements JobOperationDispatcher {
     String comment = jobTemplateEntity.comment();
 
     JobTemplateEntity.TemplateContent content = 
jobTemplateEntity.templateContent();
-    String executable = fetchFileFromUri(content.executable(), stagingDir, 
TIMEOUT_IN_MS);
+    String executable =
+        fetchFileFromUri(
+            replacePlaceholder(content.executable(), jobConf), stagingDir, 
TIMEOUT_IN_MS);
 
     List<String> args =
         content.arguments().stream()
@@ -692,7 +694,13 @@ public class JobManager implements JobOperationDispatcher {
 
     // For shell job template
     if (content.jobType() == JobTemplate.JobType.SHELL) {
-      List<String> scripts = fetchFilesFromUri(content.scripts(), stagingDir, 
TIMEOUT_IN_MS);
+      List<String> scripts =
+          content.scripts().stream()
+              .map(
+                  script ->
+                      fetchFileFromUri(
+                          replacePlaceholder(script, jobConf), stagingDir, 
TIMEOUT_IN_MS))
+              .collect(Collectors.toList());
 
       return ShellJobTemplate.builder()
           .withName(name)
@@ -707,10 +715,30 @@ public class JobManager implements JobOperationDispatcher 
{
 
     // For Spark job template
     if (content.jobType() == JobTemplate.JobType.SPARK) {
-      String className = content.className();
-      List<String> jars = fetchFilesFromUri(content.jars(), stagingDir, 
TIMEOUT_IN_MS);
-      List<String> files = fetchFilesFromUri(content.files(), stagingDir, 
TIMEOUT_IN_MS);
-      List<String> archives = fetchFilesFromUri(content.archives(), 
stagingDir, TIMEOUT_IN_MS);
+      String className = replacePlaceholder(content.className(), jobConf);
+      List<String> jars =
+          content.jars().stream()
+              .map(
+                  jar ->
+                      fetchFileFromUri(replacePlaceholder(jar, jobConf), 
stagingDir, TIMEOUT_IN_MS))
+              .collect(Collectors.toList());
+
+      List<String> files =
+          content.files().stream()
+              .map(
+                  file ->
+                      fetchFileFromUri(
+                          replacePlaceholder(file, jobConf), stagingDir, 
TIMEOUT_IN_MS))
+              .collect(Collectors.toList());
+
+      List<String> archives =
+          content.archives().stream()
+              .map(
+                  archive ->
+                      fetchFileFromUri(
+                          replacePlaceholder(archive, jobConf), stagingDir, 
TIMEOUT_IN_MS))
+              .collect(Collectors.toList());
+
       Map<String, String> configs =
           content.configs().entrySet().stream()
               .collect(
diff --git a/core/src/test/java/org/apache/gravitino/job/TestJobTemplate.java 
b/core/src/test/java/org/apache/gravitino/job/TestJobTemplate.java
index 7c53f4ec4c..3ce6d1b87c 100644
--- a/core/src/test/java/org/apache/gravitino/job/TestJobTemplate.java
+++ b/core/src/test/java/org/apache/gravitino/job/TestJobTemplate.java
@@ -222,6 +222,61 @@ public class TestJobTemplate {
     Assertions.assertTrue(scriptNames.contains(testScript2.getName()));
   }
 
+  @Test
+  public void testCreateShellRuntimeJobTemplateWithReplacementsInScripts() 
throws IOException {
+    File testScript1 = Files.createTempFile(tempDir.toPath(), "testScript1", 
".sh").toFile();
+    File testScript2 = Files.createTempFile(tempDir.toPath(), "testScript2", 
".sh").toFile();
+
+    ShellJobTemplate shellJobTemplate =
+        ShellJobTemplate.builder()
+            .withName("testShellJob1")
+            .withComment("This is a test shell job template")
+            .withExecutable("/bin/echo")
+            .withArguments(Lists.newArrayList("arg1", "arg2", "{{arg3}}, 
{{arg4}}"))
+            .withEnvironments(ImmutableMap.of("ENV_VAR1", "{{val1}}", 
"ENV_VAR2", "{{val2}}"))
+            .withCustomFields(ImmutableMap.of("customField1", 
"{{customVal1}}"))
+            .withScripts(
+                Lists.newArrayList(
+                    testScript1.toURI().toString().replace("testScript1", 
"{{scriptName1}}"),
+                    testScript2.toURI().toString().replace("testScript2", 
"{{scriptName2}}")))
+            .build();
+
+    JobTemplateEntity entity =
+        JobTemplateEntity.builder()
+            .withId(1L)
+            .withName(shellJobTemplate.name())
+            .withComment(shellJobTemplate.comment())
+            .withNamespace(NamespaceUtil.ofJobTemplate("test"))
+            .withTemplateContent(
+                
JobTemplateEntity.TemplateContent.fromJobTemplate(shellJobTemplate))
+            .withAuditInfo(
+                
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    JobTemplate result =
+        JobManager.createRuntimeJobTemplate(
+            entity,
+            ImmutableMap.of(
+                "arg3", "value3",
+                "arg4", "value4",
+                "val1", "value1",
+                "val2", "value2",
+                "customVal1", "customValue1",
+                "scriptName1", "testScript1",
+                "scriptName2", "testScript2"),
+            tempStagingDir);
+
+    Assertions.assertEquals("echo", new File(result.executable).getName());
+    Assertions.assertEquals(2, ((ShellJobTemplate) result).scripts().size());
+    List<String> scriptNames =
+        ((ShellJobTemplate) result)
+            .scripts().stream()
+                .map(script -> new File(script).getName())
+                .collect(Collectors.toList());
+    Assertions.assertTrue(scriptNames.contains(testScript1.getName()));
+    Assertions.assertTrue(scriptNames.contains(testScript2.getName()));
+  }
+
   @Test
   public void testCreateSparkRuntimeJobTemplate() throws IOException {
     File executable = Files.createTempFile(tempDir.toPath(), "testSparkJob", 
".jar").toFile();
@@ -311,4 +366,96 @@ public class TestJobTemplate {
         ImmutableMap.of("spark.executor.memory", "4g", "spark.driver.cores", 
"2"),
         ((SparkJobTemplate) result).configs());
   }
+
+  @Test
+  public void testCreateSparkRuntimeJobTemplateWithReplacements() throws 
IOException {
+    File executable = Files.createTempFile(tempDir.toPath(), "testSparkJob", 
".jar").toFile();
+    File jar1 = Files.createTempFile(tempDir.toPath(), "testJar1", 
".jar").toFile();
+    File jar2 = Files.createTempFile(tempDir.toPath(), "testJar2", 
".jar").toFile();
+
+    File file1 = Files.createTempFile(tempDir.toPath(), "testFile1", 
".txt").toFile();
+    File file2 = Files.createTempFile(tempDir.toPath(), "testFile2", 
".txt").toFile();
+
+    File archive1 = Files.createTempFile(tempDir.toPath(), "testArchive1", 
".zip").toFile();
+
+    SparkJobTemplate sparkJobTemplate =
+        SparkJobTemplate.builder()
+            .withName("testSparkJob")
+            .withComment("This is a test Spark job template")
+            .withExecutable(executable.toURI().toString().replace("test", 
"{{env}}"))
+            .withClassName("org.apache.gravitino.TestSparkJob")
+            .withArguments(Lists.newArrayList("arg1", "arg2", "{{arg3}}"))
+            .withEnvironments(ImmutableMap.of("ENV_VAR1", "{{val1}}", 
"ENV_VAR2", "{{val2}}"))
+            .withCustomFields(ImmutableMap.of("customField1", 
"{{customVal1}}"))
+            .withJars(
+                Lists.newArrayList(
+                    jar1.toURI().toString().replace("test", "{{env}}"),
+                    jar2.toURI().toString().replace("test", "{{env}}")))
+            .withFiles(
+                Lists.newArrayList(
+                    file1.toURI().toString().replace("test", "{{env}}"),
+                    file2.toURI().toString().replace("test", "{{env}}")))
+            .withArchives(
+                Lists.newArrayList(archive1.toURI().toString().replace("test", 
"{{env}}")))
+            .withConfigs(
+                ImmutableMap.of(
+                    "spark.executor.memory",
+                    "{{executor-mem}}",
+                    "spark.driver.cores",
+                    "{{driver-cores}}"))
+            .build();
+
+    JobTemplateEntity entity =
+        JobTemplateEntity.builder()
+            .withId(1L)
+            .withName(sparkJobTemplate.name())
+            .withComment(sparkJobTemplate.comment())
+            .withNamespace(NamespaceUtil.ofJobTemplate("test"))
+            .withTemplateContent(
+                
JobTemplateEntity.TemplateContent.fromJobTemplate(sparkJobTemplate))
+            .withAuditInfo(
+                
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+            .build();
+
+    JobTemplate result =
+        JobManager.createRuntimeJobTemplate(
+            entity,
+            ImmutableMap.of(
+                "arg3", "value3",
+                "val1", "value1",
+                "val2", "value2",
+                "customVal1", "customValue1",
+                "executor-mem", "4g",
+                "driver-cores", "2",
+                "env", "test"),
+            tempStagingDir);
+
+    Assertions.assertEquals(executable.getName(), new 
File(result.executable).getName());
+    Assertions.assertEquals(Lists.newArrayList("arg1", "arg2", "value3"), 
result.arguments());
+    Assertions.assertEquals(
+        ImmutableMap.of("ENV_VAR1", "value1", "ENV_VAR2", "value2"), 
result.environments());
+    Assertions.assertEquals(ImmutableMap.of("customField1", "customValue1"), 
result.customFields());
+
+    Assertions.assertEquals(2, ((SparkJobTemplate) result).jars().size());
+    List<String> jarNames =
+        ((SparkJobTemplate) result)
+            .jars().stream().map(jar -> new 
File(jar).getName()).collect(Collectors.toList());
+    Assertions.assertTrue(jarNames.contains(jar1.getName()));
+    Assertions.assertTrue(jarNames.contains(jar2.getName()));
+
+    Assertions.assertEquals(2, ((SparkJobTemplate) result).files().size());
+    List<String> fileNames =
+        ((SparkJobTemplate) result)
+            .files().stream().map(file -> new 
File(file).getName()).collect(Collectors.toList());
+    Assertions.assertTrue(fileNames.contains(file1.getName()));
+    Assertions.assertTrue(fileNames.contains(file2.getName()));
+
+    Assertions.assertEquals(1, ((SparkJobTemplate) result).archives().size());
+    List<String> archiveNames =
+        ((SparkJobTemplate) result)
+            .archives().stream()
+                .map(archive -> new File(archive).getName())
+                .collect(Collectors.toList());
+    Assertions.assertTrue(archiveNames.contains(archive1.getName()));
+  }
 }
diff --git a/docs/manage-jobs-in-gravitino.md b/docs/manage-jobs-in-gravitino.md
index d94ad6d204..9d31215da7 100644
--- a/docs/manage-jobs-in-gravitino.md
+++ b/docs/manage-jobs-in-gravitino.md
@@ -16,12 +16,12 @@ import TabItem from '@theme/TabItem';
 
 Starting from 1.0.0, Apache Gravitino introduces a new submodule called the 
job system for users to
 register, run, and manage jobs. This job system integrates with the existing 
metadata
-management, enabling users to execute the jobs or actions based on the 
metadata, 
-known as metadata-driven actions. For instance, this allows users to run jobs 
for tasks such as 
+management, enabling users to execute the jobs or actions based on the 
metadata,
+known as metadata-driven actions. For instance, this allows users to run jobs 
for tasks such as
 compacting Iceberg tables or cleaning old data based on TTL properties.
 
 The aim of the job system is to provide a unified way to manage job templates 
and jobs,
-including registering job templates, running jobs based on the job templates, 
and other related 
+including registering job templates, running jobs based on the job templates, 
and other related
 tasks. The job system itself is a unified job submitter that allows users to 
run jobs through it,
 but it doesn't provide the actual job execution capabilities. Instead, it 
relies on the
 existing job executors (schedulers), such as Apache Airflow, Apache Livy, to 
execute the jobs.
@@ -72,13 +72,10 @@ Here is a brief description of the fields in the job 
template:
 - `jobType`: The type of the job template, use `shell` for a shell job 
template.
 - `comment`: A comment for the job template, which can be used to describe the 
job template.
 - `executable`: The path to the executable script, which can be a shell script 
or any executable script.
-- `arguments`: The arguments to pass to the executable script, you can use 
placeholders like `{{arg1}}`
-  and `{{arg2}}` to be replaced with actual values when running the job.
-- `environments`: The environment variables to set when running the job, you 
can use placeholders like
-  `{{value1}}` and `{{value2}}` to be replaced with actual values when running 
the job.
+- `arguments`: The arguments to pass to the executable script.
+- `environments`: The environment variables to set when running the jo.
 - `customFields`: Custom fields for the job template, which can be used to 
store additional
-  information, you can use placeholders like `{{value1}}` and `{{value2}}` to 
be replaced with actual
-  values when running the job.
+  information.
 - `scripts`: A list of scripts that the main executable script can use.
 
 Please note that:
@@ -87,10 +84,9 @@ Please note that:
    Gravitino supports accessing files from the local file system, HTTP(S) 
URLs, and FTP(S) URLs
    (more distributed file system support will be added in the future). So the 
`executable` and
    `scripts` can be a local file path, or a URL like 
`http://example.com/my_script.sh`.
-2. The `arguments`, `environments`, and `customFields` can use placeholders 
like `{{arg1}}` and
-   `{{value1}}` to be replaced with actual values when running the job. The 
placeholders will be
-   replaced with the actual values when running the job, so you can use them 
to pass dynamic values
-   to the job template.
+2. The `executable`, `arguments`, `environments`, `customFields` and `scripts` 
can use placeholders
+   like `{{arg1}}` and `{{value1}}`, the style is `{{foo}}`. They will be 
replaced with
+   the actual values when running the job, so you can use them to pass dynamic 
values to the job template.
 3. Gravitino will copy the `executable` and `scripts` files to the job working 
directory
    when running the job, so you can use the relative path in the `executable` 
and `scripts` to
    refer to other scripts in the job working directory.
@@ -134,14 +130,10 @@ Here is a brief description of the fields in the Spark 
job template:
 - `comment`: A comment for the job template, which can be used to describe the 
job template.
 - `executable`: The path to the Spark application JAR file, which can be a 
local file path or a URL
   with a supported scheme.
-- `arguments`: The arguments to pass to the Spark application, you can use 
placeholders like
-  `{{arg1}}` and `{{arg2}}` to be replaced with actual values when running the 
job.
-- `environments`: The environment variables to set when running the job, you 
can use placeholders like
-  `{{value1}}` and `{{value2}}` to be replaced with actual values when running 
the job.
+- `arguments`: The arguments to pass to the Spark application.
+- `environments`: The environment variables to set when running the job.
 - `customFields`: Custom fields for the job template, which can be used to 
store additional information.
-  It can use placeholders like `{{value1}}` and `{{value2}}` to be replaced 
with actual values
-  when running the job.
-- `className`: The main class of the Spark application. It is required for 
Java/Scala Spark 
+- `className`: The main class of the Spark application. It is required for 
Java/Scala Spark
   application. For PySpark application, this field can be `null` instead.
 - `jars`: A list of JAR files to add to the Spark job classpath, which can be 
a local file path or a URL
   with a supported scheme.
@@ -149,8 +141,7 @@ Here is a brief description of the fields in the Spark job 
template:
   file path or a URL with a supported scheme.
 - `archives`: A list of archives to be extracted to the working directory of 
the Spark job, which
   can be a local file path or a URL with a supported scheme.
-- `configs`: A map of Spark configurations to set when running the Spark job, 
which can use placeholders
-  like `{{value1}}` to be replaced with actual values when running the job.
+- `configs`: A map of Spark configurations to set when running the Spark job.
 
 Note that:
 
@@ -159,10 +150,10 @@ Note that:
    FTP(S) URLs (more distributed file system supports will be added in the 
future). So the
    `executable`, `jars`, `files`, and `archives` can be a local file path, or 
a URL like
    `http://example.com/my_spark_app.jar`.
-2. The `arguments`, `environments`, `customFields`, and `configs` can use 
placeholders like
-   `{{arg1}}` and `{{value1}}` to be replaced with actual values when running 
the job. The placeholders
-   will be replaced with the actual values when running the job, so you can 
use them to pass dynamic
-   values to the job template.
+2. The `executable`, `arguments`, `environments`, `customFields`, `className`, 
`jars`, `files`,
+   `archives`, and `configs` can use placeholders like `{{arg1}}` and 
`{{value1}}`, the style is
+   `{{foo}}`. They will be replaced with the actual values when running the 
job, so you can use them
+   to pass dynamic values to the job template.
 3. Gravitino will copy the `executable`, `jars`, `files`, and `archives` files 
to the job working
    directory when running the job, so you can use the relative path in the 
`executable`, `jars`,
    `files`, and `archives` to refer to other files in the job working 
directory.
@@ -440,7 +431,7 @@ placeholders in the job template.
 
 Gravitino leverages the job executor to run the job, so you need to specify 
the job executor
 through configuration `gravitino.job.executor`. By default, it is set to 
"local", which means
-the job will be launched as a process within the same machine that runs the 
Gravitino server. Note 
+the job will be launched as a process within the same machine that runs the 
Gravitino server. Note
 that the local job executor is only for testing. If you want to run the job in 
a distributed environment,
 you need to implement your own `JobExecutor` and set the configuration, please 
see
 [Implement a custom job executor](#implement-a-custom-job-executor) section 
below.

Reply via email to