This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new ccb6beb  [SPARK-53869] Support multiple files in `pyFiles` field
ccb6beb is described below

commit ccb6beb985f7614ffec4f896fc55a3db67d54bb0
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Fri Oct 10 11:48:44 2025 -0700

    [SPARK-53869] Support multiple files in `pyFiles` field
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support multiple files in `SparkApplication`'s `pyFiles` 
field.
    
    ### Why are the changes needed?
    
    Currently, `pyFiles` is mapped to the main resource directly because it 
assumes a single Python file.
    
    
https://github.com/apache/spark-kubernetes-operator/blob/75515c752086853b1676cff39c7fc84b99163dc0/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java#L143
    
    However, it's supposed to be a comma-separated string. If users provide 
multiple files, t causes a failure like the following.
    
    ```
    python3: can't open file 
'/opt/spark/examples/src/main/python/pi.py,local:///opt/spark/examples/src/main/python/sort.py':
 [Errno 2] │
    ```
    
    This PR proposes a mitigation to handle the first file of `pyFiles` as the 
primary resource and the rest of files as the real `pyFiles`. Note that the 
previous logic works without any change and new logic is going to be applied 
only when `mainClass` is `org.apache.spark.deploy.PythonRunner` specified 
additionally.
    
    **BEFORE**
    
    ```yaml
    spec:
      pyFiles: "local:///opt/spark/examples/src/main/python/pi.py"
    ```
    
    **AFTER**
    
    ```yaml
    spec:
      mainClass: "org.apache.spark.deploy.PythonRunner"
      pyFiles: 
"local:///opt/spark/examples/src/main/python/pi.py,local:///opt/spark/examples/src/main/python/lib.py"
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No behavior change because new logic works only when `mainClass` is 
`org.apache.spark.deploy.PythonRunner`.
    
    ### How was this patch tested?
    
    Pass the CIs with newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #382 from dongjoon-hyun/SPARK-53869.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../k8s/operator/SparkAppSubmissionWorker.java     |  6 +++++
 .../k8s/operator/SparkAppSubmissionWorkerTest.java | 28 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
index 7c5fb98..04beda0 100644
--- 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
@@ -139,6 +139,12 @@ public class SparkAppSubmissionWorker {
     if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
       primaryResource = new 
JavaMainAppResource(Option.apply(applicationSpec.getJars()));
       effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
+    } else if 
("org.apache.spark.deploy.PythonRunner".equals(applicationSpec.getMainClass())) 
{
+      String[] files = applicationSpec.getPyFiles().split(",", 2);
+      primaryResource = new PythonMainAppResource(files[0]);
+      if (files.length > 1 && !files[1].isBlank()) {
+        effectiveSparkConf.setIfMissing("spark.submit.pyFiles", files[1]);
+      }
     } else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
       primaryResource = new 
PythonMainAppResource(applicationSpec.getPyFiles());
       effectiveSparkConf.setIfMissing("spark.submit.pyFiles", 
applicationSpec.getPyFiles());
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
index 19c6787..4376b28 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
@@ -129,6 +129,34 @@ class SparkAppSubmissionWorkerTest {
     }
   }
 
+  @Test
+  void handlePyFiles() {
+    Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>();
+    try (MockedConstruction<SparkAppDriverConf> mocked =
+        mockConstruction(
+            SparkAppDriverConf.class,
+            (mock, context) -> constructorArgs.put(mock, new 
ArrayList<>(context.arguments())))) {
+      SparkApplication mockApp = mock(SparkApplication.class);
+      ApplicationSpec mockSpec = mock(ApplicationSpec.class);
+      ObjectMeta appMeta = new 
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
+      when(mockApp.getSpec()).thenReturn(mockSpec);
+      when(mockApp.getMetadata()).thenReturn(appMeta);
+      
when(mockSpec.getMainClass()).thenReturn("org.apache.spark.deploy.PythonRunner");
+      when(mockSpec.getPyFiles()).thenReturn("main.py,lib.py");
+
+      SparkAppSubmissionWorker submissionWorker = new 
SparkAppSubmissionWorker();
+      SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, 
Collections.emptyMap());
+      assertEquals(6, constructorArgs.get(conf).size());
+      assertEquals(
+          "lib.py", ((SparkConf) 
constructorArgs.get(conf).get(0)).get("spark.submit.pyFiles"));
+
+      // validate main resources
+      assertInstanceOf(PythonMainAppResource.class, 
constructorArgs.get(conf).get(2));
+      PythonMainAppResource mainResource = (PythonMainAppResource) 
constructorArgs.get(conf).get(2);
+      assertEquals("main.py", mainResource.primaryResource());
+    }
+  }
+
   @Test
   void buildDriverConfForRApp() {
     Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to