This is an automated email from the ASF dual-hosted git repository. sandy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c4ffb7b31169 [SPARK-52431][SDP] Finishing touches on Declarative Pipelines runner c4ffb7b31169 is described below commit c4ffb7b3116900f21078f3fcf03cd8fa37f2fd1c Author: Sandy Ryza <sandyr...@gmail.com> AuthorDate: Fri Jun 13 09:39:19 2025 -0700 [SPARK-52431][SDP] Finishing touches on Declarative Pipelines runner ### What changes were proposed in this pull request? Adds finishing touches to the Declarative Pipelines CLI entrypoint. - Makes it behave like a standard spark-submitted application. - Enables running `spark-pipelines` against common cluster managers. - Enables running `spark-pipelines` without the remote flag. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? - Added unit tests - Ran the `spark-pipelines` CLI with various combinations ### Was this patch authored or co-authored using generative AI tooling? No Closes #51165 from sryza/entrypoint-ft. Lead-authored-by: Sandy Ryza <sandyr...@gmail.com> Co-authored-by: Sandy Ryza <sandy.r...@databricks.com> Signed-off-by: Sandy Ryza <sandy.r...@databricks.com> --- bin/spark-pipelines | 2 +- .../org/apache/spark/deploy/SparkPipelines.scala | 106 ++++++++++++++++ .../apache/spark/deploy/SparkPipelinesSuite.scala | 137 +++++++++++++++++++++ python/pyspark/pipelines/cli.py | 11 +- 4 files changed, 248 insertions(+), 8 deletions(-) diff --git a/bin/spark-pipelines b/bin/spark-pipelines index 52baeeafab08..2174df7bed69 100755 --- a/bin/spark-pipelines +++ b/bin/spark-pipelines @@ -30,4 +30,4 @@ fi export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH" export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.9-src.zip:$PYTHONPATH" -$PYSPARK_PYTHON "${SPARK_HOME}"/python/pyspark/pipelines/cli.py "$@" +exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkPipelines "$@" diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala b/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala new file mode 100644 index 000000000000..edad8901a030 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/SparkPipelines.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.util + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkUserAppException +import org.apache.spark.internal.Logging +import org.apache.spark.launcher.SparkSubmitArgumentsParser + +/** + * Outer implementation of the spark-pipelines command line interface. Responsible for routing + * spark-submit args to spark-submit, and pipeline-specific args to the inner Python CLI + * implementation that loads the user code and submits it to the backend. + */ +object SparkPipelines extends Logging { + def main(args: Array[String]): Unit = { + val sparkHome = sys.env("SPARK_HOME") + SparkSubmit.main(constructSparkSubmitArgs(args, sparkHome).toArray) + } + + protected[deploy] def constructSparkSubmitArgs( + args: Array[String], + sparkHome: String): Seq[String] = { + val (sparkSubmitArgs, pipelinesArgs) = splitArgs(args) + val pipelinesCliFile = s"$sparkHome/python/pyspark/pipelines/cli.py" + (sparkSubmitArgs ++ Seq(pipelinesCliFile) ++ pipelinesArgs) + } + + /** + * Split the arguments into spark-submit args (--master, --remote, etc.) and pipeline args + * (run, --spec, etc.). + */ + private def splitArgs(args: Array[String]): (Seq[String], Seq[String]) = { + val sparkSubmitArgs = new ArrayBuffer[String]() + val pipelinesArgs = new ArrayBuffer[String]() + var remote = "local" + + new SparkSubmitArgumentsParser() { + parse(util.Arrays.asList(args: _*)) + + override protected def handle(opt: String, value: String): Boolean = { + if (opt == "--remote") { + remote = value + } else if (opt == "--class") { + logInfo("--class argument not supported.") + throw SparkUserAppException(1) + } else if (opt == "--conf" && + value.startsWith("spark.api.mode=") && + value != "spark.api.mode=connect") { + logInfo( + "--spark.api.mode must be 'connect'. " + + "Declarative Pipelines currently only supports Spark Connect." + ) + throw SparkUserAppException(1) + } else if (Seq("--name", "-h", "--help").contains(opt)) { + pipelinesArgs += opt + if (value != null && value.nonEmpty) { + pipelinesArgs += value + } + } else { + sparkSubmitArgs += opt + if (value != null) { + sparkSubmitArgs += value + } + } + + true + } + + override protected def handleExtraArgs(extra: util.List[String]): Unit = { + pipelinesArgs.appendAll(extra.asScala) + } + + override protected def handleUnknown(opt: String): Boolean = { + pipelinesArgs += opt + true + } + } + + sparkSubmitArgs += "--conf" + sparkSubmitArgs += "spark.api.mode=connect" + sparkSubmitArgs += "--remote" + sparkSubmitArgs += remote + (sparkSubmitArgs.toSeq, pipelinesArgs.toSeq) + } + +} diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala new file mode 100644 index 000000000000..a482eaa42c35 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/SparkPipelinesSuite.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.matchers.must.Matchers + +import org.apache.spark.SparkUserAppException + +class SparkPipelinesSuite extends SparkSubmitTestUtils with Matchers with BeforeAndAfterEach { + test("only spark submit args") { + val args = Array( + "--remote", + "local[2]", + "--deploy-mode", + "client", + "--supervise", + "--conf", + "spark.conf1=2", + "--conf", + "spark.conf2=3" + ) + assert( + SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") == + Seq( + "--deploy-mode", + "client", + "--supervise", + "--conf", + "spark.conf1=2", + "--conf", + "spark.conf2=3", + "--conf", + "spark.api.mode=connect", + "--remote", + "local[2]", + "abc/python/pyspark/pipelines/cli.py" + ) + ) + } + + test("only pipelines args") { + val args = Array( + "run", + "--spec", + "pipeline.yml" + ) + assert( + SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") == + Seq( + "--conf", + "spark.api.mode=connect", + "--remote", + "local", + "abc/python/pyspark/pipelines/cli.py", + "run", + "--spec", + "pipeline.yml" + ) + ) + } + + test("spark-submit and pipelines args") { + val args = Array( + "--remote", + "local[2]", + "run", + "--supervise", + "--spec", + "pipeline.yml", + "--conf", + "spark.conf2=3" + ) + assert( + SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") == + Seq( + "--supervise", + "--conf", + "spark.conf2=3", + "--conf", + "spark.api.mode=connect", + "--remote", + "local[2]", + "abc/python/pyspark/pipelines/cli.py", + "run", + "--spec", + "pipeline.yml" + ) + ) + } + + test("class arg prohibited") { + val args = Array( + "--class", + "org.apache.spark.deploy.SparkPipelines" + ) + intercept[SparkUserAppException] { + SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") + } + } + + test("name arg") { + val args = Array( + "init", + "--name", + "myproject" + ) + assert( + SparkPipelines.constructSparkSubmitArgs(args, sparkHome = "abc") == + Seq( + "--conf", + "spark.api.mode=connect", + "--remote", + "local", + "abc/python/pyspark/pipelines/cli.py", + "init", + "--name", + "myproject" + ) + ) + } +} diff --git a/python/pyspark/pipelines/cli.py b/python/pyspark/pipelines/cli.py index 824c2a5fff16..f4c053858d14 100644 --- a/python/pyspark/pipelines/cli.py +++ b/python/pyspark/pipelines/cli.py @@ -205,17 +205,17 @@ def change_dir(path: Path) -> Generator[None, None, None]: os.chdir(prev) -def run(spec_path: Path, remote: str) -> None: +def run(spec_path: Path) -> None: """Run the pipeline defined with the given spec.""" log_with_curr_timestamp(f"Loading pipeline spec from {spec_path}...") spec = load_pipeline_spec(spec_path) log_with_curr_timestamp("Creating Spark session...") - spark_builder = SparkSession.builder.remote(remote) + spark_builder = SparkSession.builder for key, value in spec.configuration.items(): spark_builder = spark_builder.config(key, value) - spark = spark_builder.create() + spark = spark_builder.getOrCreate() log_with_curr_timestamp("Creating dataflow graph...") dataflow_graph_id = create_dataflow_graph( @@ -244,9 +244,6 @@ if __name__ == "__main__": # "run" subcommand run_parser = subparsers.add_parser("run", help="Run a pipeline.") run_parser.add_argument("--spec", help="Path to the pipeline spec.") - run_parser.add_argument( - "--remote", help="The Spark Connect remote to connect to.", required=True - ) # "init" subcommand init_parser = subparsers.add_parser( @@ -274,6 +271,6 @@ if __name__ == "__main__": else: spec_path = find_pipeline_spec(Path.cwd()) - run(spec_path=spec_path, remote=args.remote) + run(spec_path=spec_path) elif args.command == "init": init(args.name) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org