This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new ccb6beb [SPARK-53869] Support multiple files in `pyFiles` field
ccb6beb is described below
commit ccb6beb985f7614ffec4f896fc55a3db67d54bb0
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Fri Oct 10 11:48:44 2025 -0700
[SPARK-53869] Support multiple files in `pyFiles` field
### What changes were proposed in this pull request?
This PR aims to support multiple files in `SparkApplication`'s `pyFiles`
field.
### Why are the changes needed?
Currently, `pyFiles` is mapped to the main resource directly because it
assumes a single Python file.
https://github.com/apache/spark-kubernetes-operator/blob/75515c752086853b1676cff39c7fc84b99163dc0/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java#L143
However, it's supposed to be a comma-separated string. If users provide
multiple files, t causes a failure like the following.
```
python3: can't open file
'/opt/spark/examples/src/main/python/pi.py,local:///opt/spark/examples/src/main/python/sort.py':
[Errno 2] │
```
This PR proposes a mitigation to handle the first file of `pyFiles` as the
primary resource and the rest of files as the real `pyFiles`. Note that the
previous logic works without any change and new logic is going to be applied
only when `mainClass` is `org.apache.spark.deploy.PythonRunner` specified
additionally.
**BEFORE**
```yaml
spec:
pyFiles: "local:///opt/spark/examples/src/main/python/pi.py"
```
**AFTER**
```yaml
spec:
mainClass: "org.apache.spark.deploy.PythonRunner"
pyFiles:
"local:///opt/spark/examples/src/main/python/pi.py,local:///opt/spark/examples/src/main/python/lib.py"
```
### Does this PR introduce _any_ user-facing change?
No behavior change because new logic works only when `mainClass` is
`org.apache.spark.deploy.PythonRunner`.
### How was this patch tested?
Pass the CIs with newly added test case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #382 from dongjoon-hyun/SPARK-53869.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../k8s/operator/SparkAppSubmissionWorker.java | 6 +++++
.../k8s/operator/SparkAppSubmissionWorkerTest.java | 28 ++++++++++++++++++++++
2 files changed, 34 insertions(+)
diff --git
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
index 7c5fb98..04beda0 100644
---
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
+++
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
@@ -139,6 +139,12 @@ public class SparkAppSubmissionWorker {
if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
primaryResource = new
JavaMainAppResource(Option.apply(applicationSpec.getJars()));
effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
+ } else if
("org.apache.spark.deploy.PythonRunner".equals(applicationSpec.getMainClass()))
{
+ String[] files = applicationSpec.getPyFiles().split(",", 2);
+ primaryResource = new PythonMainAppResource(files[0]);
+ if (files.length > 1 && !files[1].isBlank()) {
+ effectiveSparkConf.setIfMissing("spark.submit.pyFiles", files[1]);
+ }
} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
primaryResource = new
PythonMainAppResource(applicationSpec.getPyFiles());
effectiveSparkConf.setIfMissing("spark.submit.pyFiles",
applicationSpec.getPyFiles());
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
index 19c6787..4376b28 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
@@ -129,6 +129,34 @@ class SparkAppSubmissionWorkerTest {
}
}
+ @Test
+ void handlePyFiles() {
+ Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>();
+ try (MockedConstruction<SparkAppDriverConf> mocked =
+ mockConstruction(
+ SparkAppDriverConf.class,
+ (mock, context) -> constructorArgs.put(mock, new
ArrayList<>(context.arguments())))) {
+ SparkApplication mockApp = mock(SparkApplication.class);
+ ApplicationSpec mockSpec = mock(ApplicationSpec.class);
+ ObjectMeta appMeta = new
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
+ when(mockApp.getSpec()).thenReturn(mockSpec);
+ when(mockApp.getMetadata()).thenReturn(appMeta);
+
when(mockSpec.getMainClass()).thenReturn("org.apache.spark.deploy.PythonRunner");
+ when(mockSpec.getPyFiles()).thenReturn("main.py,lib.py");
+
+ SparkAppSubmissionWorker submissionWorker = new
SparkAppSubmissionWorker();
+ SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp,
Collections.emptyMap());
+ assertEquals(6, constructorArgs.get(conf).size());
+ assertEquals(
+ "lib.py", ((SparkConf)
constructorArgs.get(conf).get(0)).get("spark.submit.pyFiles"));
+
+ // validate main resources
+ assertInstanceOf(PythonMainAppResource.class,
constructorArgs.get(conf).get(2));
+ PythonMainAppResource mainResource = (PythonMainAppResource)
constructorArgs.get(conf).get(2);
+ assertEquals("main.py", mainResource.primaryResource());
+ }
+ }
+
@Test
void buildDriverConfForRApp() {
Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]