This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 81c29eef4ad3 [SPARK-52238][SDP] Rename Pipeline Spec Field 
"definitions" to 'libraries'
81c29eef4ad3 is described below

commit 81c29eef4ad3bc1cab187bc6bf9fe1e5ab51e7c6
Author: Jacky Wang <jacky.w...@databricks.com>
AuthorDate: Wed Sep 10 16:45:55 2025 +0800

    [SPARK-52238][SDP] Rename Pipeline Spec Field "definitions" to 'libraries'
    
    ### What changes were proposed in this pull request?
    
    Rename the pipeline spec field from "definitions" to "libraries".
    This field allows user to include pipeline source code files.
    
    ```diff
      name: libraries-test
    - definitions:
    + libraries:
        - glob:
            include: transformations/**/*.py
        - glob:
            include: transformations/**/*.sql
    
    ```
    
    ### Why are the changes needed?
    
    Open up the possibility to add other types of dependencies for a pipeline 
execution, such as python wheels. `libraries` is a more general term.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but SDP not released.
    
    ### How was this patch tested?
    
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #52294 from JiaqiWang18/rename-spec-field-libraries.
    
    Authored-by: Jacky Wang <jacky.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 python/pyspark/pipelines/cli.py            | 20 ++++++++--------
 python/pyspark/pipelines/init_cli.py       |  2 +-
 python/pyspark/pipelines/tests/test_cli.py | 38 +++++++++++++++---------------
 3 files changed, 30 insertions(+), 30 deletions(-)

diff --git a/python/pyspark/pipelines/cli.py b/python/pyspark/pipelines/cli.py
index 43f9ae150f3f..dcfda1959a2e 100644
--- a/python/pyspark/pipelines/cli.py
+++ b/python/pyspark/pipelines/cli.py
@@ -52,8 +52,8 @@ PIPELINE_SPEC_FILE_NAMES = ["pipeline.yaml", "pipeline.yml"]
 
 
 @dataclass(frozen=True)
-class DefinitionsGlob:
-    """A glob pattern for finding pipeline definitions files."""
+class LibrariesGlob:
+    """A glob pattern for finding pipeline source codes."""
 
     include: str
 
@@ -66,14 +66,14 @@ class PipelineSpec:
     :param catalog: The default catalog to use for the pipeline.
     :param database: The default database to use for the pipeline.
     :param configuration: A dictionary of Spark configuration properties to 
set for the pipeline.
-    :param definitions: A list of glob patterns for finding pipeline 
definitions files.
+    :param libraries: A list of glob patterns for finding pipeline source 
codes.
     """
 
     name: str
     catalog: Optional[str]
     database: Optional[str]
     configuration: Mapping[str, str]
-    definitions: Sequence[DefinitionsGlob]
+    libraries: Sequence[LibrariesGlob]
 
 
 def find_pipeline_spec(current_dir: Path) -> Path:
@@ -113,7 +113,7 @@ def load_pipeline_spec(spec_path: Path) -> PipelineSpec:
 
 
 def unpack_pipeline_spec(spec_data: Mapping[str, Any]) -> PipelineSpec:
-    ALLOWED_FIELDS = {"name", "catalog", "database", "schema", 
"configuration", "definitions"}
+    ALLOWED_FIELDS = {"name", "catalog", "database", "schema", 
"configuration", "libraries"}
     REQUIRED_FIELDS = ["name"]
     for key in spec_data.keys():
         if key not in ALLOWED_FIELDS:
@@ -133,9 +133,9 @@ def unpack_pipeline_spec(spec_data: Mapping[str, Any]) -> 
PipelineSpec:
         catalog=spec_data.get("catalog"),
         database=spec_data.get("database", spec_data.get("schema")),
         configuration=validate_str_dict(spec_data.get("configuration", {}), 
"configuration"),
-        definitions=[
-            DefinitionsGlob(include=entry["glob"]["include"])
-            for entry in spec_data.get("definitions", [])
+        libraries=[
+            LibrariesGlob(include=entry["glob"]["include"])
+            for entry in spec_data.get("libraries", [])
         ],
     )
 
@@ -178,8 +178,8 @@ def register_definitions(
     with change_dir(path):
         with graph_element_registration_context(registry):
             log_with_curr_timestamp(f"Loading definitions. Root directory: 
'{path}'.")
-            for definition_glob in spec.definitions:
-                glob_expression = definition_glob.include
+            for libraries_glob in spec.libraries:
+                glob_expression = libraries_glob.include
                 matching_files = [p for p in path.glob(glob_expression) if 
p.is_file()]
                 log_with_curr_timestamp(
                     f"Found {len(matching_files)} files matching glob 
'{glob_expression}'"
diff --git a/python/pyspark/pipelines/init_cli.py 
b/python/pyspark/pipelines/init_cli.py
index 227e5aa5deca..89b998bd4f32 100644
--- a/python/pyspark/pipelines/init_cli.py
+++ b/python/pyspark/pipelines/init_cli.py
@@ -19,7 +19,7 @@ from pathlib import Path
 
 SPEC = """
 name: {{ name }}
-definitions:
+libraries:
   - glob:
       include: transformations/**/*.py
   - glob:
diff --git a/python/pyspark/pipelines/tests/test_cli.py 
b/python/pyspark/pipelines/tests/test_cli.py
index 8055723ddc5a..fc238fac1786 100644
--- a/python/pyspark/pipelines/tests/test_cli.py
+++ b/python/pyspark/pipelines/tests/test_cli.py
@@ -34,7 +34,7 @@ if should_test_connect and have_yaml:
         load_pipeline_spec,
         register_definitions,
         unpack_pipeline_spec,
-        DefinitionsGlob,
+        LibrariesGlob,
         PipelineSpec,
         run,
     )
@@ -58,7 +58,7 @@ class CLIUtilityTests(unittest.TestCase):
                         "key1": "value1",
                         "key2": "value2"
                     },
-                    "definitions": [
+                    "libraries": [
                         {"glob": {"include": "test_include"}}
                     ]
                 }
@@ -70,8 +70,8 @@ class CLIUtilityTests(unittest.TestCase):
             assert spec.catalog == "test_catalog"
             assert spec.database == "test_database"
             assert spec.configuration == {"key1": "value1", "key2": "value2"}
-            assert len(spec.definitions) == 1
-            assert spec.definitions[0].include == "test_include"
+            assert len(spec.libraries) == 1
+            assert spec.libraries[0].include == "test_include"
 
     def test_load_pipeline_spec_name_is_required(self):
         with tempfile.NamedTemporaryFile(mode="w") as tmpfile:
@@ -84,7 +84,7 @@ class CLIUtilityTests(unittest.TestCase):
                         "key1": "value1",
                         "key2": "value2"
                     },
-                    "definitions": [
+                    "libraries": [
                         {"glob": {"include": "test_include"}}
                     ]
                 }
@@ -110,7 +110,7 @@ class CLIUtilityTests(unittest.TestCase):
                         "key1": "value1",
                         "key2": "value2"
                     },
-                    "definitions": [
+                    "libraries": [
                         {"glob": {"include": "test_include"}}
                     ]
                 }
@@ -121,8 +121,8 @@ class CLIUtilityTests(unittest.TestCase):
             assert spec.catalog == "test_catalog"
             assert spec.database == "test_database"
             assert spec.configuration == {"key1": "value1", "key2": "value2"}
-            assert len(spec.definitions) == 1
-            assert spec.definitions[0].include == "test_include"
+            assert len(spec.libraries) == 1
+            assert spec.libraries[0].include == "test_include"
 
     def test_load_pipeline_spec_invalid(self):
         with tempfile.NamedTemporaryFile(mode="w") as tmpfile:
@@ -134,7 +134,7 @@ class CLIUtilityTests(unittest.TestCase):
                         "key1": "value1",
                         "key2": "value2"
                     },
-                    "definitions": [
+                    "libraries": [
                         {"glob": {"include": "test_include"}}
                     ]
                 }
@@ -150,7 +150,7 @@ class CLIUtilityTests(unittest.TestCase):
 
     def test_unpack_empty_pipeline_spec(self):
         empty_spec = PipelineSpec(
-            name="test_pipeline", catalog=None, database=None, 
configuration={}, definitions=[]
+            name="test_pipeline", catalog=None, database=None, 
configuration={}, libraries=[]
         )
         self.assertEqual(unpack_pipeline_spec({"name": "test_pipeline"}), 
empty_spec)
 
@@ -176,7 +176,7 @@ class CLIUtilityTests(unittest.TestCase):
                     {
                         "catalog": "test_catalog",
                         "configuration": {},
-                        "definitions": []
+                        "libraries": []
                     }
                     """
                 )
@@ -193,7 +193,7 @@ class CLIUtilityTests(unittest.TestCase):
                     {
                         "catalog": "test_catalog",
                         "configuration": {},
-                        "definitions": []
+                        "libraries": []
                     }
                     """
                 )
@@ -226,7 +226,7 @@ class CLIUtilityTests(unittest.TestCase):
                     {
                         "catalog": "test_catalog",
                         "configuration": {},
-                        "definitions": []
+                        "libraries": []
                     }
                     """
                 )
@@ -240,7 +240,7 @@ class CLIUtilityTests(unittest.TestCase):
             catalog=None,
             database=None,
             configuration={},
-            definitions=[DefinitionsGlob(include="subdir1/*")],
+            libraries=[LibrariesGlob(include="subdir1/*")],
         )
         with tempfile.TemporaryDirectory() as temp_dir:
             outer_dir = Path(temp_dir)
@@ -248,7 +248,7 @@ class CLIUtilityTests(unittest.TestCase):
             subdir1.mkdir()
             subdir2 = outer_dir / "subdir2"
             subdir2.mkdir()
-            with (subdir1 / "definitions.py").open("w") as f:
+            with (subdir1 / "libraries.py").open("w") as f:
                 f.write(
                     textwrap.dedent(
                         """
@@ -260,7 +260,7 @@ class CLIUtilityTests(unittest.TestCase):
                     )
                 )
 
-            with (subdir2 / "definitions.py").open("w") as f:
+            with (subdir2 / "libraries.py").open("w") as f:
                 f.write(
                     textwrap.dedent(
                         """
@@ -283,7 +283,7 @@ class CLIUtilityTests(unittest.TestCase):
             catalog=None,
             database=None,
             configuration={},
-            definitions=[DefinitionsGlob(include="*")],
+            libraries=[LibrariesGlob(include="*")],
         )
         with tempfile.TemporaryDirectory() as temp_dir:
             outer_dir = Path(temp_dir)
@@ -301,7 +301,7 @@ class CLIUtilityTests(unittest.TestCase):
             catalog=None,
             database=None,
             configuration={},
-            definitions=[DefinitionsGlob(include="*")],
+            libraries=[LibrariesGlob(include="*")],
         )
         with tempfile.TemporaryDirectory() as temp_dir:
             outer_dir = Path(temp_dir)
@@ -355,7 +355,7 @@ class CLIUtilityTests(unittest.TestCase):
                         catalog=None,
                         database=None,
                         configuration={},
-                        definitions=[DefinitionsGlob(include="defs.py")],
+                        libraries=[LibrariesGlob(include="defs.py")],
                     ),
                 )
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to