This is an automated email from the ASF dual-hosted git repository. sandy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6ab0df9287c5 [SPARK-53044] Change Declarative Pipelines import alias convention from "sdp" to "dp" 6ab0df9287c5 is described below commit 6ab0df9287c5a9ce49769612c2bb0a1daab83bee Author: Sandy Ryza <sandy.r...@databricks.com> AuthorDate: Fri Aug 22 14:21:30 2025 -0700 [SPARK-53044] Change Declarative Pipelines import alias convention from "sdp" to "dp" ### What changes were proposed in this pull request? Changes tests, docs, and examples from ```python from pyspark import pipelines as sdp ``` to ```python from pyspark import pipelines as dp ``` ### Why are the changes needed? It's more common for Python libraries to have 2-character import aliases. It's also easier for users to type. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? Closes #51752 from sryza/sdp-to-dp. Authored-by: Sandy Ryza <sandy.r...@databricks.com> Signed-off-by: Sandy Ryza <sandy.r...@databricks.com> --- docs/declarative-pipelines-programming-guide.md | 60 ++++++++-------- python/pyspark/pipelines/init_cli.py | 4 +- python/pyspark/pipelines/tests/test_cli.py | 6 +- python/pyspark/pipelines/tests/test_decorators.py | 8 +-- .../pipelines/tests/test_graph_element_registry.py | 18 ++--- .../pyspark/pipelines/tests/test_spark_connect.py | 6 +- .../connect/pipelines/PythonPipelineSuite.scala | 80 +++++++++++----------- 7 files changed, 91 insertions(+), 91 deletions(-) diff --git a/docs/declarative-pipelines-programming-guide.md b/docs/declarative-pipelines-programming-guide.md index 9b75c11a8aa8..3e33153e3d25 100644 --- a/docs/declarative-pipelines-programming-guide.md +++ b/docs/declarative-pipelines-programming-guide.md @@ -115,20 +115,20 @@ The `spark-pipelines` command line interface (CLI) is the primary way to execute ## Programming with SDP in Python -SDP Python functions are defined in the `pyspark.pipelines` module. Your pipelines implemented with the Python API must import this module. It's common to alias the module to `sdp` to limit the number of characters you need to type when using its APIs. +SDP Python functions are defined in the `pyspark.pipelines` module. Your pipelines implemented with the Python API must import this module. It's common to alias the module to `dp` to limit the number of characters you need to type when using its APIs. ```python -from pyspark import pipelines as sdp +from pyspark import pipelines as dp ``` ### Creating a Materialized View with Python -The `@sdp.materialized_view` decorator tells SDP to create a materialized view based on the results returned by a function that performs a batch read: +The `@dp.materialized_view` decorator tells SDP to create a materialized view based on the results returned by a function that performs a batch read: ```python -from pyspark import pipelines as sdp +from pyspark import pipelines as dp -@sdp.materialized_view +@dp.materialized_view def basic_mv(): return spark.table("samples.nyctaxi.trips") ``` @@ -136,21 +136,21 @@ def basic_mv(): Optionally, you can specify the table name using the `name` argument: ```python -from pyspark import pipelines as sdp +from pyspark import pipelines as dp -@sdp.materialized_view(name="trips_mv") +@dp.materialized_view(name="trips_mv") def basic_mv(): return spark.table("samples.nyctaxi.trips") ``` ### Creating a Temporary View with Python -The `@sdp.temporary_view` decorator tells SDP to create a temporary view based on the results returned by a function that performs a batch read: +The `@dp.temporary_view` decorator tells SDP to create a temporary view based on the results returned by a function that performs a batch read: ```python -from pyspark import pipelines as sdp +from pyspark import pipelines as dp -@sdp.temporary_view +@dp.temporary_view def basic_tv(): return spark.table("samples.nyctaxi.trips") ``` @@ -159,12 +159,12 @@ This temporary view can be read by other queries within the pipeline, but can't ### Creating a Streaming Table with Python -Similarly, you can create a streaming table by using the `@sdp.table` decorator with a function that performs a streaming read: +Similarly, you can create a streaming table by using the `@dp.table` decorator with a function that performs a streaming read: ```python -from pyspark import pipelines as sdp +from pyspark import pipelines as dp -@sdp.table +@dp.table def basic_st(): return spark.readStream.table("samples.nyctaxi.trips") ``` @@ -174,9 +174,9 @@ def basic_st(): SDP supports loading data from all formats supported by Spark. For example, you can create a streaming table whose query reads from a Kafka topic: ```python -from pyspark import pipelines as sdp +from pyspark import pipelines as dp -@sdp.table +@dp.table def ingestion_st(): return ( spark.readStream.format("kafka") @@ -189,9 +189,9 @@ def ingestion_st(): For batch reads: ```python -from pyspark import pipelines as sdp +from pyspark import pipelines as dp -@sdp.materialized_view +@dp.materialized_view def batch_mv(): return spark.read.format("json").load("/datasets/retail-org/sales_orders") ``` @@ -201,10 +201,10 @@ def batch_mv(): You can reference other tables defined in your pipeline in the same way you'd reference tables defined outside your pipeline: ```python -from pyspark import pipelines as sdp +from pyspark import pipelines as dp from pyspark.sql.functions import col -@sdp.table +@dp.table def orders(): return ( spark.readStream.format("kafka") @@ -213,11 +213,11 @@ def orders(): .load() ) -@sdp.materialized_view +@dp.materialized_view def customers(): return spark.read.format("csv").option("header", True).load("/datasets/retail-org/customers") -@sdp.materialized_view +@dp.materialized_view def customer_orders(): return (spark.table("orders") .join(spark.table("customers"), "customer_id") @@ -228,7 +228,7 @@ def customer_orders(): ) ) -@sdp.materialized_view +@dp.materialized_view def daily_orders_by_state(): return (spark.table("customer_orders") .groupBy("state", "order_date") @@ -241,10 +241,10 @@ def daily_orders_by_state(): You can use Python `for` loops to create multiple tables programmatically: ```python -from pyspark import pipelines as sdp +from pyspark import pipelines as dp from pyspark.sql.functions import collect_list, col -@sdp.temporary_view() +@dp.temporary_view() def customer_orders(): orders = spark.table("samples.tpch.orders") customer = spark.table("samples.tpch.customer") @@ -261,7 +261,7 @@ def customer_orders(): col("o_orderdate").alias("orderdate")) ) -@sdp.temporary_view() +@dp.temporary_view() def nation_region(): nation = spark.table("samples.tpch.nation") region = spark.table("samples.tpch.region") @@ -279,7 +279,7 @@ region_list = spark.table("samples.tpch.region").select(collect_list("r_name")). # Iterate through region names to create new region-specific materialized views for region in region_list: - @sdp.table(name=f"{region.lower().replace(' ', '_')}_customer_orders") + @dp.table(name=f"{region.lower().replace(' ', '_')}_customer_orders") def regional_customer_orders(region_filter=region): customer_orders = spark.table("customer_orders") nation_region = spark.table("nation_region") @@ -304,18 +304,18 @@ for region in region_list: You can create multiple flows that append data to the same target: ```python -from pyspark import pipelines as sdp +from pyspark import pipelines as dp # create a streaming table -sdp.create_streaming_table("customers_us") +dp.create_streaming_table("customers_us") # add the first append flow -@sdp.append_flow(target = "customers_us") +@dp.append_flow(target = "customers_us") def append1(): return spark.readStream.table("customers_us_west") # add the second append flow -@sdp.append_flow(target = "customers_us") +@dp.append_flow(target = "customers_us") def append2(): return spark.readStream.table("customers_us_east") ``` diff --git a/python/pyspark/pipelines/init_cli.py b/python/pyspark/pipelines/init_cli.py index 90a320dd9f5a..227e5aa5deca 100644 --- a/python/pyspark/pipelines/init_cli.py +++ b/python/pyspark/pipelines/init_cli.py @@ -26,12 +26,12 @@ definitions: include: transformations/**/*.sql """ -PYTHON_EXAMPLE = """from pyspark import pipelines as sdp +PYTHON_EXAMPLE = """from pyspark import pipelines as dp from pyspark.sql import DataFrame, SparkSession spark = SparkSession.active() -@sdp.materialized_view +@dp.materialized_view def example_python_materialized_view() -> DataFrame: return spark.range(10) """ diff --git a/python/pyspark/pipelines/tests/test_cli.py b/python/pyspark/pipelines/tests/test_cli.py index ff62ce42c4a3..8055723ddc5a 100644 --- a/python/pyspark/pipelines/tests/test_cli.py +++ b/python/pyspark/pipelines/tests/test_cli.py @@ -252,8 +252,8 @@ class CLIUtilityTests(unittest.TestCase): f.write( textwrap.dedent( """ - from pyspark import pipelines as sdp - @sdp.materialized_view + from pyspark import pipelines as dp + @dp.materialized_view def mv1(): raise NotImplementedError() """ @@ -264,7 +264,7 @@ class CLIUtilityTests(unittest.TestCase): f.write( textwrap.dedent( """ - from pyspark import pipelines as sdp + from pyspark import pipelines as dp def mv2(): raise NotImplementedError() """ diff --git a/python/pyspark/pipelines/tests/test_decorators.py b/python/pyspark/pipelines/tests/test_decorators.py index ac575b43d109..31a966da164d 100644 --- a/python/pyspark/pipelines/tests/test_decorators.py +++ b/python/pyspark/pipelines/tests/test_decorators.py @@ -18,12 +18,12 @@ import unittest from pyspark.errors import PySparkTypeError -from pyspark import pipelines as sdp +from pyspark import pipelines as dp class DecoratorsTest(unittest.TestCase): def test_dataset_name_not_string(self): - for decorator in [sdp.table, sdp.temporary_view, sdp.materialized_view]: + for decorator in [dp.table, dp.temporary_view, dp.materialized_view]: with self.assertRaises(PySparkTypeError) as context: @decorator(name=5) @@ -37,7 +37,7 @@ class DecoratorsTest(unittest.TestCase): }, context.exception.getMessageParameters() def test_invalid_partition_cols(self): - for decorator in [sdp.table, sdp.materialized_view]: + for decorator in [dp.table, dp.materialized_view]: with self.assertRaises(PySparkTypeError) as context: @decorator(partition_cols=["a", 1, 2]) # type: ignore @@ -51,7 +51,7 @@ class DecoratorsTest(unittest.TestCase): }, context.exception.getMessageParameters() def test_decorator_with_positional_arg(self): - for decorator in [sdp.table, sdp.temporary_view, sdp.materialized_view]: + for decorator in [dp.table, dp.temporary_view, dp.materialized_view]: with self.assertRaises(PySparkTypeError) as context: decorator("table1") diff --git a/python/pyspark/pipelines/tests/test_graph_element_registry.py b/python/pyspark/pipelines/tests/test_graph_element_registry.py index fb8504f4f8ed..2f9d2c69b7d5 100644 --- a/python/pyspark/pipelines/tests/test_graph_element_registry.py +++ b/python/pyspark/pipelines/tests/test_graph_element_registry.py @@ -19,7 +19,7 @@ import unittest from pyspark.errors import PySparkException from pyspark.pipelines.graph_element_registry import graph_element_registration_context -from pyspark import pipelines as sdp +from pyspark import pipelines as dp from pyspark.pipelines.tests.local_graph_element_registry import LocalGraphElementRegistry @@ -28,21 +28,21 @@ class GraphElementRegistryTest(unittest.TestCase): registry = LocalGraphElementRegistry() with graph_element_registration_context(registry): - @sdp.materialized_view + @dp.materialized_view def mv(): raise NotImplementedError() - @sdp.table + @dp.table def st(): raise NotImplementedError() - sdp.create_streaming_table("st2") + dp.create_streaming_table("st2") - @sdp.append_flow(target="st2") + @dp.append_flow(target="st2") def flow1(): raise NotImplementedError() - @sdp.append_flow(target="st2") + @dp.append_flow(target="st2") def flow2(): raise NotImplementedError() @@ -82,7 +82,7 @@ class GraphElementRegistryTest(unittest.TestCase): assert mv_flow_obj.source_code_location.filename.endswith("test_graph_element_registry.py") def test_definition_without_graph_element_registry(self): - for decorator in [sdp.table, sdp.temporary_view, sdp.materialized_view]: + for decorator in [dp.table, dp.temporary_view, dp.materialized_view]: with self.assertRaises(PySparkException) as context: @decorator @@ -95,7 +95,7 @@ class GraphElementRegistryTest(unittest.TestCase): ) with self.assertRaises(PySparkException) as context: - sdp.create_streaming_table("st") + dp.create_streaming_table("st") self.assertEqual( context.exception.getCondition(), @@ -104,7 +104,7 @@ class GraphElementRegistryTest(unittest.TestCase): with self.assertRaises(PySparkException) as context: - @sdp.append_flow(target="st") + @dp.append_flow(target="st") def b(): raise NotImplementedError() diff --git a/python/pyspark/pipelines/tests/test_spark_connect.py b/python/pyspark/pipelines/tests/test_spark_connect.py index 935295ec4a8c..6d81a98c8c44 100644 --- a/python/pyspark/pipelines/tests/test_spark_connect.py +++ b/python/pyspark/pipelines/tests/test_spark_connect.py @@ -31,7 +31,7 @@ from pyspark.pipelines.spark_connect_pipeline import ( start_run, handle_pipeline_events, ) -from pyspark import pipelines as sdp +from pyspark import pipelines as dp from pyspark.testing.connectutils import ( ReusedConnectTestCase, should_test_connect, @@ -47,7 +47,7 @@ class SparkConnectPipelinesTest(ReusedConnectTestCase): with graph_element_registration_context(registry): - @sdp.materialized_view + @dp.materialized_view def mv(): return self.spark.range(1) @@ -67,7 +67,7 @@ class SparkConnectPipelinesTest(ReusedConnectTestCase): with graph_element_registration_context(registry): - @sdp.table + @dp.table def st(): # Invalid because a streaming query is expected return self.spark.range(1) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 1bc2172d86e5..8e475446e8c4 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -49,7 +49,7 @@ class PythonPipelineSuite val pythonCode = s""" |from pyspark.sql import SparkSession - |from pyspark import pipelines as sdp + |from pyspark import pipelines as dp |from pyspark.pipelines.spark_connect_graph_element_registry import ( | SparkConnectGraphElementRegistry, |) @@ -105,7 +105,7 @@ class PythonPipelineSuite test("basic") { val graph = buildGraph(""" - |@sdp.table + |@dp.table |def table1(): | return spark.readStream.format("rate").load() |""".stripMargin) @@ -118,19 +118,19 @@ class PythonPipelineSuite test("basic with inverted topological order") { // This graph is purposefully in the wrong topological order to test the topological sort val graph = buildGraph(""" - |@sdp.table() + |@dp.table() |def b(): | return spark.readStream.table("a") | - |@sdp.table() + |@dp.table() |def c(): | return spark.readStream.table("a") | - |@sdp.materialized_view() + |@dp.materialized_view() |def d(): | return spark.read.table("a") | - |@sdp.materialized_view() + |@dp.materialized_view() |def a(): | return spark.range(5) |""".stripMargin) @@ -141,11 +141,11 @@ class PythonPipelineSuite test("flows") { val graph = buildGraph(""" - |@sdp.table() + |@dp.table() |def a(): | return spark.readStream.format("rate").load() | - |@sdp.append_flow(target = "a") + |@dp.append_flow(target = "a") |def supplement(): | return spark.readStream.format("rate").load() |""".stripMargin).resolve().validate() @@ -160,15 +160,15 @@ class PythonPipelineSuite test("referencing internal datasets") { val graph = buildGraph(""" - |@sdp.materialized_view + |@dp.materialized_view |def src(): | return spark.range(5) | - |@sdp.materialized_view + |@dp.materialized_view |def a(): | return spark.read.table("src") | - |@sdp.table + |@dp.table |def b(): | return spark.readStream.table("src") |""".stripMargin).resolve().validate() @@ -191,15 +191,15 @@ class PythonPipelineSuite test("referencing external datasets") { sql("CREATE TABLE spark_catalog.default.src AS SELECT * FROM RANGE(5)") val graph = buildGraph(""" - |@sdp.materialized_view + |@dp.materialized_view |def a(): | return spark.read.table("spark_catalog.default.src") | - |@sdp.materialized_view + |@dp.materialized_view |def b(): | return spark.table("spark_catalog.default.src") | - |@sdp.table + |@dp.table |def c(): | return spark.readStream.table("spark_catalog.default.src") |""".stripMargin).resolve().validate() @@ -218,15 +218,15 @@ class PythonPipelineSuite test("referencing internal datasets failed") { val graph = buildGraph(""" - |@sdp.table + |@dp.table |def a(): | return spark.read.table("src") | - |@sdp.table + |@dp.table |def b(): | return spark.table("src") | - |@sdp.table + |@dp.table |def c(): | return spark.readStream.table("src") |""".stripMargin).resolve() @@ -240,15 +240,15 @@ class PythonPipelineSuite test("referencing external datasets failed") { val graph = buildGraph(""" - |@sdp.table + |@dp.table |def a(): | return spark.read.table("spark_catalog.default.src") | - |@sdp.materialized_view + |@dp.materialized_view |def b(): | return spark.table("spark_catalog.default.src") | - |@sdp.materialized_view + |@dp.materialized_view |def c(): | return spark.readStream.table("spark_catalog.default.src") |""".stripMargin).resolve() @@ -260,11 +260,11 @@ class PythonPipelineSuite test("create dataset with the same name will fail") { val ex = intercept[AnalysisException] { buildGraph(s""" - |@sdp.materialized_view + |@dp.materialized_view |def a(): | return spark.range(1) | - |@sdp.materialized_view(name = "a") + |@dp.materialized_view(name = "a") |def b(): | return spark.range(1) |""".stripMargin) @@ -274,19 +274,19 @@ class PythonPipelineSuite test("create datasets with fully/partially qualified names") { val graph = buildGraph(s""" - |@sdp.table + |@dp.table |def mv_1(): | return spark.range(5) | - |@sdp.table(name = "schema_a.mv_2") + |@dp.table(name = "schema_a.mv_2") |def irrelevant_1(): | return spark.range(5) | - |@sdp.table(name = "st_1") + |@dp.table(name = "st_1") |def irrelevant_2(): | return spark.readStream.format("rate").load() | - |@sdp.table(name = "schema_b.st_2") + |@dp.table(name = "schema_b.st_2") |def irrelevant_3(): | return spark.readStream.format("rate").load() |""".stripMargin).resolve() @@ -333,11 +333,11 @@ class PythonPipelineSuite test("create datasets with three part names") { val graphTry = Try { buildGraph(s""" - |@sdp.table(name = "some_catalog.some_schema.mv") + |@dp.table(name = "some_catalog.some_schema.mv") |def irrelevant_1(): | return spark.range(5) | - |@sdp.table(name = "some_catalog.some_schema.st") + |@dp.table(name = "some_catalog.some_schema.st") |def irrelevant_2(): | return spark.readStream.format("rate").load() |""".stripMargin).resolve() @@ -356,18 +356,18 @@ class PythonPipelineSuite test("temporary views works") { // A table is defined since pipeline with only temporary views is invalid. val graph = buildGraph(s""" - |@sdp.table + |@dp.table |def mv_1(): | return spark.range(5) - |@sdp.temporary_view + |@dp.temporary_view |def view_1(): | return spark.range(5) | - |@sdp.temporary_view(name= "view_2") + |@dp.temporary_view(name= "view_2") |def irrelevant_1(): | return spark.read.table("view_1") | - |@sdp.temporary_view(name= "view_3") + |@dp.temporary_view(name= "view_3") |def irrelevant_2(): | return spark.read.table("view_1") |""".stripMargin).resolve() @@ -385,11 +385,11 @@ class PythonPipelineSuite test("create named flow with multipart name will fail") { val ex = intercept[RuntimeException] { buildGraph(s""" - |@sdp.table + |@dp.table |def src(): | return spark.readStream.table("src0") | - |@sdp.append_flow(name ="some_schema.some_flow", target = "src") + |@dp.append_flow(name ="some_schema.some_flow", target = "src") |def some_flow(): | return spark.readStream.format("rate").load() |""".stripMargin) @@ -399,11 +399,11 @@ class PythonPipelineSuite test("create flow with multipart target and no explicit name succeeds") { val graph = buildGraph(""" - |@sdp.table() + |@dp.table() |def a(): | return spark.readStream.format("rate").load() | - |@sdp.append_flow(target = "default.a") + |@dp.append_flow(target = "default.a") |def supplement(): | return spark.readStream.format("rate").load() |""".stripMargin).resolve().validate() @@ -418,11 +418,11 @@ class PythonPipelineSuite test("create named flow with multipart target succeeds") { val graph = buildGraph(""" - |@sdp.table() + |@dp.table() |def a(): | return spark.readStream.format("rate").load() | - |@sdp.append_flow(target = "default.a", name = "something") + |@dp.append_flow(target = "default.a", name = "something") |def supplement(): | return spark.readStream.format("rate").load() |""".stripMargin) @@ -449,7 +449,7 @@ class PythonPipelineSuite checkError( exception = intercept[AnalysisException] { buildGraph(s""" - |@sdp.temporary_view + |@dp.temporary_view |def view_1(): | return spark.range(5) |""".stripMargin) @@ -462,7 +462,7 @@ class PythonPipelineSuite checkError( exception = intercept[AnalysisException] { buildGraph(s""" - |@sdp.append_flow(target = "a") + |@dp.append_flow(target = "a") |def flow(): | return spark.range(5) |""".stripMargin) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org