This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-1.0
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-1.0 by this push:
new 748672186c [#7962]feat(core): Add spark process builder support for
local job executor (#8667)
748672186c is described below
commit 748672186c55cb60185146a1220b6afe83758dab
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Sep 24 12:22:05 2025 +0800
[#7962]feat(core): Add spark process builder support for local job executor
(#8667)
### What changes were proposed in this pull request?
This PR adds the Spark process builder for local job executor, so users
can run the Spark jobs in the local environment.
### Why are the changes needed?
This is a part of the work of the job system
Fix: #7962
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT and local tests.
Co-authored-by: Jerry Shao <[email protected]>
---
.../org/apache/gravitino/job/SparkJobTemplate.java | 9 +-
.../gravitino/dto/job/SparkJobTemplateDTO.java | 10 --
.../gravitino/dto/job/TestJobTemplateDTO.java | 32 ++---
.../job/local/LocalJobExecutorConfigs.java | 2 +
.../gravitino/job/local/LocalProcessBuilder.java | 6 +-
.../gravitino/job/local/ShellProcessBuilder.java | 7 +
.../gravitino/job/local/SparkProcessBuilder.java | 138 ++++++++++++++++++++
.../job/local/TestSparkProcessBuilder.java | 143 +++++++++++++++++++++
docs/manage-jobs-in-gravitino.md | 21 ++-
9 files changed, 324 insertions(+), 44 deletions(-)
diff --git a/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
b/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
index 13b6d8c6bf..1402dc455b 100644
--- a/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
+++ b/api/src/main/java/org/apache/gravitino/job/SparkJobTemplate.java
@@ -18,13 +18,11 @@
*/
package org.apache.gravitino.job;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import org.apache.commons.lang3.StringUtils;
/**
* Represents a job template for executing Spark applications. This class
extends the JobTemplate
@@ -144,7 +142,9 @@ public class SparkJobTemplate extends JobTemplate {
@Override
public String toString() {
StringBuilder sb = new StringBuilder("\nSparkJobTemplate{\n");
- sb.append(" className='").append(className).append("',\n");
+ if (className != null) {
+ sb.append(" className='").append(className).append("',\n");
+ }
if (!jars.isEmpty()) {
sb.append(" jars=[\n");
@@ -280,9 +280,6 @@ public class SparkJobTemplate extends JobTemplate {
protected void validate() {
super.validate();
- Preconditions.checkArgument(
- StringUtils.isNotBlank(className), "Class name must not be null or
empty");
-
this.jars = jars != null ? ImmutableList.copyOf(jars) :
ImmutableList.of();
this.files = files != null ? ImmutableList.copyOf(files) :
ImmutableList.of();
this.archives = archives != null ? ImmutableList.copyOf(archives) :
ImmutableList.of();
diff --git
a/common/src/main/java/org/apache/gravitino/dto/job/SparkJobTemplateDTO.java
b/common/src/main/java/org/apache/gravitino/dto/job/SparkJobTemplateDTO.java
index 7b492639e8..8530196d17 100644
--- a/common/src/main/java/org/apache/gravitino/dto/job/SparkJobTemplateDTO.java
+++ b/common/src/main/java/org/apache/gravitino/dto/job/SparkJobTemplateDTO.java
@@ -19,7 +19,6 @@
package org.apache.gravitino.dto.job;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
import java.util.List;
import java.util.Map;
import lombok.EqualsAndHashCode;
@@ -27,7 +26,6 @@ import lombok.Getter;
import lombok.ToString;
import lombok.experimental.Accessors;
import lombok.experimental.SuperBuilder;
-import org.apache.commons.lang3.StringUtils;
/** Represents a Spark Job Template Data Transfer Object (DTO). */
@Getter
@@ -57,12 +55,4 @@ public class SparkJobTemplateDTO extends JobTemplateDTO {
// Default constructor for Jackson
super();
}
-
- @Override
- public void validate() {
- super.validate();
-
- Preconditions.checkArgument(
- StringUtils.isNotBlank(className), "\"className\" is required and
cannot be empty");
- }
}
diff --git
a/common/src/test/java/org/apache/gravitino/dto/job/TestJobTemplateDTO.java
b/common/src/test/java/org/apache/gravitino/dto/job/TestJobTemplateDTO.java
index 3e85d35c1f..a2e3f23d38 100644
--- a/common/src/test/java/org/apache/gravitino/dto/job/TestJobTemplateDTO.java
+++ b/common/src/test/java/org/apache/gravitino/dto/job/TestJobTemplateDTO.java
@@ -376,6 +376,22 @@ public class TestJobTemplateDTO {
JsonUtils.objectMapper().readValue(nullConfigsSerJson,
JobTemplateDTO.class);
Assertions.assertEquals(nullConfigsJobTemplateDTO,
nullConfigsDeserJobTemplateDTO);
+ // Test className is null
+ SparkJobTemplateDTO nullClassNameJobTemplateDTO =
+ SparkJobTemplateDTO.builder()
+ .withJobType(JobTemplate.JobType.SPARK)
+ .withName("testSparkJobNullClassName")
+ .withExecutable("/path/to/spark-submit")
+
.withAudit(AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+
+ Assertions.assertDoesNotThrow(nullClassNameJobTemplateDTO::validate);
+ String nullClassNameSerJson =
+
JsonUtils.objectMapper().writeValueAsString(nullClassNameJobTemplateDTO);
+ JobTemplateDTO nullClassNameDeserJobTemplateDTO =
+ JsonUtils.objectMapper().readValue(nullClassNameSerJson,
JobTemplateDTO.class);
+ Assertions.assertEquals(nullClassNameJobTemplateDTO,
nullClassNameDeserJobTemplateDTO);
+
// Test name is null
Assertions.assertThrows(
IllegalArgumentException.class,
@@ -420,21 +436,5 @@ public class TestJobTemplateDTO {
template.validate();
},
"\"jobType\" is required and cannot be null");
-
- // Test className is null
- Assertions.assertThrows(
- IllegalArgumentException.class,
- () -> {
- SparkJobTemplateDTO template =
- SparkJobTemplateDTO.builder()
- .withJobType(JobTemplate.JobType.SPARK)
- .withName("testSparkJobNullClassName")
- .withExecutable("/path/to/spark-submit")
- .withAudit(
-
AuditDTO.builder().withCreator("test").withCreateTime(Instant.now()).build())
- .build();
- template.validate();
- },
- "\"className\" is required and cannot be empty");
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutorConfigs.java
b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutorConfigs.java
index 627f7b1fa1..22cb5d0f72 100644
---
a/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutorConfigs.java
+++
b/core/src/main/java/org/apache/gravitino/job/local/LocalJobExecutorConfigs.java
@@ -35,4 +35,6 @@ public class LocalJobExecutorConfigs {
public static final String JOB_STATUS_KEEP_TIME_MS = "jobStatusKeepTimeInMs";
public static final long DEFAULT_JOB_STATUS_KEEP_TIME_MS = 60 * 60 * 1000;
// 1 hour
+
+ public static final String SPARK_HOME = "sparkHome";
}
diff --git
a/core/src/main/java/org/apache/gravitino/job/local/LocalProcessBuilder.java
b/core/src/main/java/org/apache/gravitino/job/local/LocalProcessBuilder.java
index d516284679..0338dc437f 100644
--- a/core/src/main/java/org/apache/gravitino/job/local/LocalProcessBuilder.java
+++ b/core/src/main/java/org/apache/gravitino/job/local/LocalProcessBuilder.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.util.Map;
import org.apache.gravitino.job.JobTemplate;
import org.apache.gravitino.job.ShellJobTemplate;
+import org.apache.gravitino.job.SparkJobTemplate;
public abstract class LocalProcessBuilder {
@@ -41,8 +42,11 @@ public abstract class LocalProcessBuilder {
public static LocalProcessBuilder create(JobTemplate jobTemplate,
Map<String, String> configs) {
if (jobTemplate instanceof ShellJobTemplate) {
return new ShellProcessBuilder((ShellJobTemplate) jobTemplate, configs);
+ } else if (jobTemplate instanceof SparkJobTemplate) {
+ return new SparkProcessBuilder((SparkJobTemplate) jobTemplate, configs);
} else {
- throw new IllegalArgumentException("Unsupported job template type: " +
jobTemplate.jobType());
+ throw new IllegalArgumentException(
+ "Unsupported job template type: " +
jobTemplate.getClass().getName());
}
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/job/local/ShellProcessBuilder.java
b/core/src/main/java/org/apache/gravitino/job/local/ShellProcessBuilder.java
index 57e3b20c05..940bba246c 100644
--- a/core/src/main/java/org/apache/gravitino/job/local/ShellProcessBuilder.java
+++ b/core/src/main/java/org/apache/gravitino/job/local/ShellProcessBuilder.java
@@ -19,14 +19,19 @@
package org.apache.gravitino.job.local;
+import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import java.io.File;
import java.util.List;
import java.util.Map;
import org.apache.gravitino.job.ShellJobTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ShellProcessBuilder extends LocalProcessBuilder {
+ private static final Logger LOG =
LoggerFactory.getLogger(ShellProcessBuilder.class);
+
protected ShellProcessBuilder(ShellJobTemplate shellJobTemplate, Map<String,
String> configs) {
super(shellJobTemplate, configs);
}
@@ -51,6 +56,8 @@ public class ShellProcessBuilder extends LocalProcessBuilder {
builder.redirectOutput(outputFile);
builder.redirectError(errorFile);
+ LOG.info("Starting local shell job with command: {}", Joiner.on("
").join(commandList));
+
try {
return builder.start();
} catch (Exception e) {
diff --git
a/core/src/main/java/org/apache/gravitino/job/local/SparkProcessBuilder.java
b/core/src/main/java/org/apache/gravitino/job/local/SparkProcessBuilder.java
new file mode 100644
index 0000000000..d577c5af9f
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/job/local/SparkProcessBuilder.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.job.local;
+
+import static
org.apache.gravitino.job.local.LocalJobExecutorConfigs.SPARK_HOME;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.util.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The SparkProcessBuilder class is responsible for constructing and starting
a local Spark job
+ * process using the spark-submit command. Using local job executor to run
Spark job has some
+ * limitations: 1. It uses the aliveness of the spark-submit process to
determine if the job is
+ * still running, which is not accurate for the cluster deploy mode. 2. It
cannot support user
+ * impersonation for now.
+ */
+public class SparkProcessBuilder extends LocalProcessBuilder {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkProcessBuilder.class);
+
+ private static final String ENV_SPARK_HOME = "SPARK_HOME";
+
+ private final String sparkSubmit;
+
+ protected SparkProcessBuilder(SparkJobTemplate sparkJobTemplate, Map<String,
String> configs) {
+ super(sparkJobTemplate, configs);
+ String sparkHome =
+
Optional.ofNullable(configs.get(SPARK_HOME)).orElse(System.getenv(ENV_SPARK_HOME));
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(sparkHome),
+ "gravitino.jobExecutor.local.sparkHome or SPARK_HOME environment
variable must"
+ + " be set for Spark jobs");
+
+ this.sparkSubmit = sparkHome + "/bin/spark-submit";
+ File sparkSubmitFile = new File(sparkSubmit);
+ Preconditions.checkArgument(
+ sparkSubmitFile.canExecute(),
+ "spark-submit is not found or not executable: " + sparkSubmit);
+ }
+
+ @VisibleForTesting
+ static List<String> generateSparkSubmitCommand(
+ String sparkSubmit, SparkJobTemplate sparkJobTemplate) {
+ List<String> commandList = Lists.newArrayList(sparkSubmit);
+
+ // Add the main class
+ if (StringUtils.isNotBlank(sparkJobTemplate.className())) {
+ commandList.add("--class");
+ commandList.add(sparkJobTemplate.className());
+ }
+
+ // Add jars if it is not empty
+ if (!sparkJobTemplate.jars().isEmpty()) {
+ commandList.add("--jars");
+ commandList.add(String.join(",", sparkJobTemplate.jars()));
+ }
+
+ // Add files if it is not empty
+ if (!sparkJobTemplate.files().isEmpty()) {
+ commandList.add("--files");
+ commandList.add(String.join(",", sparkJobTemplate.files()));
+ }
+
+ // Add archives if it is not empty
+ if (!sparkJobTemplate.archives().isEmpty()) {
+ commandList.add("--archives");
+ commandList.add(String.join(",", sparkJobTemplate.archives()));
+ }
+
+ // Add the Spark configs
+ for (Map.Entry<String, String> entry :
sparkJobTemplate.configs().entrySet()) {
+ commandList.add("--conf");
+ commandList.add(entry.getKey() + "=" + entry.getValue());
+ }
+
+ // Add the main executable
+ commandList.add(sparkJobTemplate.executable());
+
+ // Add the arguments
+ commandList.addAll(sparkJobTemplate.arguments());
+
+ return commandList;
+ }
+
+ @Override
+ public Process start() {
+ SparkJobTemplate sparkJobTemplate = (SparkJobTemplate) jobTemplate;
+ List<String> commandList = generateSparkSubmitCommand(sparkSubmit,
sparkJobTemplate);
+
+ ProcessBuilder builder = new ProcessBuilder(commandList);
+ builder.directory(workingDirectory);
+ builder.environment().putAll(sparkJobTemplate.environments());
+
+ File outputFile = new File(workingDirectory, "output.log");
+ File errorFile = new File(workingDirectory, "error.log");
+
+ builder.redirectOutput(outputFile);
+ builder.redirectError(errorFile);
+
+ LOG.info(
+ "Starting local Spark job with command: {}, environment variables: {}",
+ Joiner.on(" ").join(commandList),
+ Joiner.on(", ").withKeyValueSeparator(":
").join(sparkJobTemplate.environments()));
+
+ try {
+ return builder.start();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start Spark process", e);
+ }
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/job/local/TestSparkProcessBuilder.java
b/core/src/test/java/org/apache/gravitino/job/local/TestSparkProcessBuilder.java
new file mode 100644
index 0000000000..930bf1a4e0
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/job/local/TestSparkProcessBuilder.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.job.local;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestSparkProcessBuilder {
+
+ @Test
+ public void testGenerateSparkSubmitCommand() {
+ String sparkSubmitPath = "/path/to/spark-submit";
+
+ SparkJobTemplate template1 =
+ SparkJobTemplate.builder()
+ .withName("template1")
+ .withExecutable("/path/to/spark-demo.jar")
+ .withArguments(Lists.newArrayList("arg1", "arg2"))
+ .withClassName("com.example.MainClass")
+ .build();
+
+ List<String> command1 =
+ SparkProcessBuilder.generateSparkSubmitCommand(sparkSubmitPath,
template1);
+
+ Assertions.assertEquals(sparkSubmitPath, command1.get(0));
+ Assertions.assertTrue(command1.contains("--class"));
+
+ int idx = command1.indexOf("--class");
+ Assertions.assertEquals("com.example.MainClass", command1.get(idx + 1));
+
+ Assertions.assertTrue(command1.contains("/path/to/spark-demo.jar"));
+
+ Assertions.assertTrue(command1.contains("arg1"));
+ Assertions.assertTrue(command1.contains("arg2"));
+
+ SparkJobTemplate template2 =
+ SparkJobTemplate.builder()
+ .withName("template2")
+ .withExecutable("/path/to/spark-demo.py")
+ .withArguments(Lists.newArrayList("arg1", "arg2"))
+ .build();
+
+ List<String> command2 =
+ SparkProcessBuilder.generateSparkSubmitCommand(sparkSubmitPath,
template2);
+
+ Assertions.assertEquals(sparkSubmitPath, command2.get(0));
+ Assertions.assertFalse(command2.contains("--class"));
+
+ Assertions.assertTrue(command2.contains("/path/to/spark-demo.py"));
+ Assertions.assertTrue(command2.contains("arg1"));
+ Assertions.assertTrue(command2.contains("arg2"));
+
+ SparkJobTemplate template3 =
+ SparkJobTemplate.builder()
+ .withName("template3")
+ .withExecutable("/path/to/spark-demo.jar")
+ .withArguments(Lists.newArrayList("arg1", "arg2"))
+ .withJars(Lists.newArrayList("/path/to/lib1.jar",
"/path/to/lib2.jar"))
+ .withFiles(Lists.newArrayList("/path/to/file1.txt",
"/path/to/file2.txt"))
+ .withArchives(Lists.newArrayList("/path/to/archive1.zip",
"/path/to/archive2.zip"))
+ .withClassName("com.example.MainClass")
+ .build();
+
+ List<String> command3 =
+ SparkProcessBuilder.generateSparkSubmitCommand(sparkSubmitPath,
template3);
+
+ Assertions.assertEquals(sparkSubmitPath, command3.get(0));
+ Assertions.assertTrue(command3.contains("--class"));
+
+ int idx3 = command3.indexOf("--class");
+ Assertions.assertEquals("com.example.MainClass", command3.get(idx3 + 1));
+
+ Assertions.assertTrue(command3.contains("--jars"));
+ int idxJars = command3.indexOf("--jars");
+ Assertions.assertEquals("/path/to/lib1.jar,/path/to/lib2.jar",
command3.get(idxJars + 1));
+
+ Assertions.assertTrue(command3.contains("--files"));
+ int idxFiles = command3.indexOf("--files");
+ Assertions.assertEquals("/path/to/file1.txt,/path/to/file2.txt",
command3.get(idxFiles + 1));
+
+ Assertions.assertTrue(command3.contains("--archives"));
+ int idxArchives = command3.indexOf("--archives");
+ Assertions.assertEquals(
+ "/path/to/archive1.zip,/path/to/archive2.zip",
command3.get(idxArchives + 1));
+
+ Assertions.assertTrue(command3.contains("/path/to/spark-demo.jar"));
+ Assertions.assertTrue(command3.contains("arg1"));
+ Assertions.assertTrue(command3.contains("arg2"));
+
+ SparkJobTemplate template4 =
+ SparkJobTemplate.builder()
+ .withName("template4")
+ .withExecutable("/path/to/spark-demo.jar")
+ .withArguments(Lists.newArrayList("arg1", "arg2"))
+ .withClassName("com.example.MainClass")
+ .withConfigs(
+ ImmutableMap.of(
+ "spark.executor.memory", "2g",
+ "spark.executor.cores", "2"))
+ .build();
+
+ List<String> command4 =
+ SparkProcessBuilder.generateSparkSubmitCommand(sparkSubmitPath,
template4);
+
+ Assertions.assertEquals(sparkSubmitPath, command4.get(0));
+ Assertions.assertTrue(command4.contains("--class"));
+
+ int idx4 = command4.indexOf("--class");
+ Assertions.assertEquals("com.example.MainClass", command4.get(idx4 + 1));
+
+ Assertions.assertTrue(command4.contains("--conf"));
+ int idxConf1 = command4.indexOf("--conf");
+ Assertions.assertEquals("spark.executor.memory=2g", command4.get(idxConf1
+ 1));
+
+ int idxConf2 = command4.lastIndexOf("--conf");
+ Assertions.assertEquals("spark.executor.cores=2", command4.get(idxConf2 +
1));
+
+ Assertions.assertTrue(command4.contains("/path/to/spark-demo.jar"));
+ Assertions.assertTrue(command4.contains("arg1"));
+ Assertions.assertTrue(command4.contains("arg2"));
+ }
+}
diff --git a/docs/manage-jobs-in-gravitino.md b/docs/manage-jobs-in-gravitino.md
index 6d1a52236f..271dc6019d 100644
--- a/docs/manage-jobs-in-gravitino.md
+++ b/docs/manage-jobs-in-gravitino.md
@@ -99,9 +99,6 @@ Please note that:
The `spark` job template is used to run Spark jobs, it is a Spark application
JAR file for now.
-**Note** that the Spark job support is still under development, in 1.0.0, it
only supports
-registering a Spark job template, running a Spark job is not supported yet.
-
The template is defined as follows:
```json
@@ -144,7 +141,8 @@ Here is a brief description of the fields in the Spark job
template:
- `customFields`: Custom fields for the job template, which can be used to
store additional information.
It can use placeholders like `{{value1}}` and `{{value2}}` to be replaced
with actual values
when running the job.
-- `className`: The main class of the Spark application, it is required for
Spark job template.
+- `className`: The main class of the Spark application. It is required for
Java/Scala Spark
+ application. For PySpark application, this field can be `null` instead.
- `jars`: A list of JAR files to add to the Spark job classpath, which can be
a local file path or a URL
with a supported scheme.
- `files`: A list of files to be copied to the working directory of the Spark
job, which can be a local
@@ -168,8 +166,8 @@ Note that:
3. Gravitino will copy the `executable`, `jars`, `files`, and `archives` files
to the job working
directory when running the job, so you can use the relative path in the
`executable`, `jars`,
`files`, and `archives` to refer to other files in the job working
directory.
-4. The `className` is required for the Spark job template, it is the main
class of the Spark
- application to be executed.
+4. The `className` is required for the Java and Scala Spark job template, it
is the main class of
+ the Spark application to be executed. For PySpark job template, this field
can be `null` instead.
To register a job template, you can use REST API or the Java and Python SDKs.
Here is the
example to register a shell job template:
@@ -543,11 +541,12 @@ default configurations:
The local job executor is used for testing and development purposes, it runs
the job in the local process.
The following are the default configurations for the local job executor:
-| Property name | Description
| Default value
| Required | Since Version |
-|-----------------------------------------------------|---------------------------------------------------------------------------|----------------------------------------|----------|---------------|
-| `gravitino.jobExecutor.local.waitingQueueSize` | The size of the
waiting queue for queued jobs in the local job executor | `100`
| No | 1.0.0 |
-| `gravitino.jobExecutor.local.maxRunningJobs` | The maximum number of
running jobs in the local job executor | `max(1, min(available
cores / 2, 10))` | No | 1.0.0 |
-| `gravitino.jobExecutor.local.jobStatusKeepTimeInMs` | The time in
milliseconds to keep the job status in the local job executor | `3600000` (1
hour) | No | 1.0.0 |
+| Property name | Description
| Default value
| Required | Since Version |
+|-----------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------|----------|---------------|
+| `gravitino.jobExecutor.local.waitingQueueSize` | The size of the
waiting queue for queued jobs in the local job executor
| `100`
| No | 1.0.0 |
+| `gravitino.jobExecutor.local.maxRunningJobs` | The maximum number of
running jobs in the local job executor
| `max(1, min(available cores / 2,
10))` | No | 1.0.0 |
+| `gravitino.jobExecutor.local.jobStatusKeepTimeInMs` | The time in
milliseconds to keep the job status in the local job executor
| `3600000` (1 hour)
| No | 1.0.0 |
+| `gravitino.jobExecutor.local.sparkHome` | The home directory of
Spark, Gravitino checks this configuration firstly and then `SPARK_HOME` env.
Either of them should be set to run Spark job | `None`
| No | 1.0.1 |
### Implement a custom job executor