This is an automated email from the ASF dual-hosted git repository.

sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c4ffb7b31169 [SPARK-52431][SDP] Finishing touches on Declarative 
Pipelines runner
c4ffb7b31169 is described below

commit c4ffb7b3116900f21078f3fcf03cd8fa37f2fd1c
Author: Sandy Ryza <sandyr...@gmail.com>
AuthorDate: Fri Jun 13 09:39:19 2025 -0700

    [SPARK-52431][SDP] Finishing touches on Declarative Pipelines runner
    
    ### What changes were proposed in this pull request?
    
    Adds finishing touches to the Declarative Pipelines CLI entrypoint.
    
    - Makes it behave like a standard spark-submitted application.
    - Enables running `spark-pipelines` against common cluster managers.
    - Enables running `spark-pipelines` without the remote flag.
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    - Added unit tests
    - Ran the `spark-pipelines` CLI with various combinations
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51165 from sryza/entrypoint-ft.
    
    Lead-authored-by: Sandy Ryza <sandyr...@gmail.com>
    Co-authored-by: Sandy Ryza <sandy.r...@databricks.com>
    Signed-off-by: Sandy Ryza <sandy.r...@databricks.com>
---
 bin/spark-pipelines                                |   2 +-
 .../org/apache/spark/deploy/SparkPipelines.scala   | 106 ++++++++++++++++
 .../apache/spark/deploy/SparkPipelinesSuite.scala  | 137 +++++++++++++++++++++
 python/pyspark/pipelines/cli.py                    |  11 +-
 4 files changed, 248 insertions(+), 8 deletions(-)

diff --git a/bin/spark-pipelines b/bin/spark-pipelines
index 52baeeafab08..2174df7bed69 100755
--- a/bin/spark-pipelines
+++ b/bin/spark-pipelines
@@ -30,4 +30,4 @@ fi
 export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
 export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.9-src.zip:$PYTHONPATH"
 
-$PYSPARK_PYTHON "${SPARK_HOME}"/python/pyspark/pipelines/cli.py "$@"
+exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkPipelines 
"$@"
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala
new file mode 100644
index 000000000000..edad8901a030
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.util
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkUserAppException
+import org.apache.spark.internal.Logging
+import org.apache.spark.launcher.SparkSubmitArgumentsParser
+
+/**
+ * Outer implementation of the spark-pipelines command line interface. 
Responsible for routing
+ * spark-submit args to spark-submit, and pipeline-specific args to the inner 
Python CLI
+ * implementation that loads the user code and submits it to the backend.
+ */
+object SparkPipelines extends Logging {
+  def main(args: Array[String]): Unit = {
+    val sparkHome = sys.env("SPARK_HOME")
+    SparkSubmit.main(constructSparkSubmitArgs(args, sparkHome).toArray)
+  }
+
+  protected[deploy] def constructSparkSubmitArgs(
+      args: Array[String],
+      sparkHome: String): Seq[String] = {
+    val (sparkSubmitArgs, pipelinesArgs) = splitArgs(args)
+    val pipelinesCliFile = s"$sparkHome/python/pyspark/pipelines/cli.py"
+    (sparkSubmitArgs ++ Seq(pipelinesCliFile) ++ pipelinesArgs)
+  }
+
+  /**
+   * Split the arguments into spark-submit args (--master, --remote, etc.) and 
pipeline args
+   * (run, --spec, etc.).
+   */
+  private def splitArgs(args: Array[String]): (Seq[String], Seq[String]) = {
+    val sparkSubmitArgs = new ArrayBuffer[String]()
+    val pipelinesArgs = new ArrayBuffer[String]()
+    var remote = "local"
+
+    new SparkSubmitArgumentsParser() {
+      parse(util.Arrays.asList(args: _*))
+
+      override protected def handle(opt: String, value: String): Boolean = {
+        if (opt == "--remote") {
+          remote = value
+        } else if (opt == "--class") {
+          logInfo("--class argument not supported.")
+          throw SparkUserAppException(1)
+        } else if (opt == "--conf" &&
+          value.startsWith("spark.api.mode=") &&
+          value != "spark.api.mode=connect") {
+          logInfo(
+            "--spark.api.mode must be 'connect'. " +
+            "Declarative Pipelines currently only supports Spark Connect."
+          )
+          throw SparkUserAppException(1)
+        } else if (Seq("--name", "-h", "--help").contains(opt)) {
+          pipelinesArgs += opt
+          if (value != null && value.nonEmpty) {
+            pipelinesArgs += value
+          }
+        } else {
+          sparkSubmitArgs += opt
+          if (value != null) {
+            sparkSubmitArgs += value
+          }
+        }
+
+        true
+      }
+
+      override protected def handleExtraArgs(extra: util.List[String]): Unit = 
{
+        pipelinesArgs.appendAll(extra.asScala)
+      }
+
+      override protected def handleUnknown(opt: String): Boolean = {
+        pipelinesArgs += opt
+        true
+      }
+    }
+
+    sparkSubmitArgs += "--conf"
+    sparkSubmitArgs += "spark.api.mode=connect"
+    sparkSubmitArgs += "--remote"
+    sparkSubmitArgs += remote
+    (sparkSubmitArgs.toSeq, pipelinesArgs.toSeq)
+  }
+
+}
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala
new file mode 100644
index 000000000000..a482eaa42c35
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.matchers.must.Matchers
+
+import org.apache.spark.SparkUserAppException
+
+class SparkPipelinesSuite extends SparkSubmitTestUtils with Matchers with 
BeforeAndAfterEach {
+  test("only spark submit args") {
+    val args = Array(
+      "--remote",
+      "local[2]",
+      "--deploy-mode",
+      "client",
+      "--supervise",
+      "--conf",
+      "spark.conf1=2",
+      "--conf",
+      "spark.conf2=3"
+    )
+    assert(
+      SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+      Seq(
+        "--deploy-mode",
+        "client",
+        "--supervise",
+        "--conf",
+        "spark.conf1=2",
+        "--conf",
+        "spark.conf2=3",
+        "--conf",
+        "spark.api.mode=connect",
+        "--remote",
+        "local[2]",
+        "abc/python/pyspark/pipelines/cli.py"
+      )
+    )
+  }
+
+  test("only pipelines args") {
+    val args = Array(
+      "run",
+      "--spec",
+      "pipeline.yml"
+    )
+    assert(
+      SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+      Seq(
+        "--conf",
+        "spark.api.mode=connect",
+        "--remote",
+        "local",
+        "abc/python/pyspark/pipelines/cli.py",
+        "run",
+        "--spec",
+        "pipeline.yml"
+      )
+    )
+  }
+
+  test("spark-submit and pipelines args") {
+    val args = Array(
+      "--remote",
+      "local[2]",
+      "run",
+      "--supervise",
+      "--spec",
+      "pipeline.yml",
+      "--conf",
+      "spark.conf2=3"
+    )
+    assert(
+      SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+      Seq(
+        "--supervise",
+        "--conf",
+        "spark.conf2=3",
+        "--conf",
+        "spark.api.mode=connect",
+        "--remote",
+        "local[2]",
+        "abc/python/pyspark/pipelines/cli.py",
+        "run",
+        "--spec",
+        "pipeline.yml"
+      )
+    )
+  }
+
+  test("class arg prohibited") {
+    val args = Array(
+      "--class",
+      "org.apache.spark.deploy.SparkPipelines"
+    )
+    intercept[SparkUserAppException] {
+      SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc")
+    }
+  }
+
+  test("name arg") {
+    val args = Array(
+      "init",
+      "--name",
+      "myproject"
+    )
+    assert(
+      SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") ==
+        Seq(
+          "--conf",
+          "spark.api.mode=connect",
+          "--remote",
+          "local",
+          "abc/python/pyspark/pipelines/cli.py",
+          "init",
+          "--name",
+          "myproject"
+        )
+    )
+  }
+}
diff --git a/python/pyspark/pipelines/cli.py b/python/pyspark/pipelines/cli.py
index 824c2a5fff16..f4c053858d14 100644
--- a/python/pyspark/pipelines/cli.py
+++ b/python/pyspark/pipelines/cli.py
@@ -205,17 +205,17 @@ def change_dir(path: Path) -> Generator[None, None, None]:
         os.chdir(prev)
 
 
-def run(spec_path: Path, remote: str) -> None:
+def run(spec_path: Path) -> None:
     """Run the pipeline defined with the given spec."""
     log_with_curr_timestamp(f"Loading pipeline spec from {spec_path}...")
     spec = load_pipeline_spec(spec_path)
 
     log_with_curr_timestamp("Creating Spark session...")
-    spark_builder = SparkSession.builder.remote(remote)
+    spark_builder = SparkSession.builder
     for key, value in spec.configuration.items():
         spark_builder = spark_builder.config(key, value)
 
-    spark = spark_builder.create()
+    spark = spark_builder.getOrCreate()
 
     log_with_curr_timestamp("Creating dataflow graph...")
     dataflow_graph_id = create_dataflow_graph(
@@ -244,9 +244,6 @@ if __name__ == "__main__":
     # "run" subcommand
     run_parser = subparsers.add_parser("run", help="Run a pipeline.")
     run_parser.add_argument("--spec", help="Path to the pipeline spec.")
-    run_parser.add_argument(
-        "--remote", help="The Spark Connect remote to connect to.", 
required=True
-    )
 
     # "init" subcommand
     init_parser = subparsers.add_parser(
@@ -274,6 +271,6 @@ if __name__ == "__main__":
         else:
             spec_path = find_pipeline_spec(Path.cwd())
 
-        run(spec_path=spec_path, remote=args.remote)
+        run(spec_path=spec_path)
     elif args.command == "init":
         init(args.name)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to