This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e638f6d20f25 [SPARK-50778][PYTHON] Add metadataColumn to PySpark 
DataFrame
e638f6d20f25 is described below

commit e638f6d20f25cecce91ed907fc04294220abe23c
Author: Takuya Ueshin <[email protected]>
AuthorDate: Fri Jan 10 10:47:37 2025 +0800

    [SPARK-50778][PYTHON] Add metadataColumn to PySpark DataFrame
    
    ### What changes were proposed in this pull request?
    
    Add `metadataColumn` to PySpark DataFrame.
    
    ### Why are the changes needed?
    
    Feature parity: The API is missing in PySpark.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the new API will be available.
    
    ### How was this patch tested?
    
    Added the related test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49430 from ueshin/issues/SPARK-50778/metadata_column.
    
    Authored-by: Takuya Ueshin <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/classic/dataframe.py    |  9 +++++++++
 python/pyspark/sql/connect/dataframe.py    | 11 ++++++++++-
 python/pyspark/sql/connect/expressions.py  | 10 +++++++++-
 python/pyspark/sql/dataframe.py            | 22 ++++++++++++++++++++++
 python/pyspark/sql/tests/test_dataframe.py | 20 ++++++++++++++++++++
 5 files changed, 70 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/classic/dataframe.py 
b/python/pyspark/sql/classic/dataframe.py
index 8beabda6c135..84498f1b2294 100644
--- a/python/pyspark/sql/classic/dataframe.py
+++ b/python/pyspark/sql/classic/dataframe.py
@@ -651,6 +651,15 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin, 
PandasConversionMixin):
     def columns(self) -> List[str]:
         return [f.name for f in self.schema.fields]
 
+    def metadataColumn(self, colName: str) -> Column:
+        if not isinstance(colName, str):
+            raise PySparkTypeError(
+                errorClass="NOT_STR",
+                messageParameters={"arg_name": "colName", "arg_type": 
type(colName).__name__},
+            )
+        jc = self._jdf.metadataColumn(colName)
+        return Column(jc)
+
     def colRegex(self, colName: str) -> Column:
         if not isinstance(colName, str):
             raise PySparkTypeError(
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 789292bdd56f..76b7881f234f 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -274,6 +274,14 @@ class DataFrame(ParentDataFrame):
         res._cached_schema = self._cached_schema
         return res
 
+    def metadataColumn(self, colName: str) -> Column:
+        if not isinstance(colName, str):
+            raise PySparkTypeError(
+                errorClass="NOT_STR",
+                messageParameters={"arg_name": "colName", "arg_type": 
type(colName).__name__},
+            )
+        return self._col(colName, is_metadata_column=True)
+
     def colRegex(self, colName: str) -> Column:
         from pyspark.sql.connect.column import Column as ConnectColumn
 
@@ -1750,13 +1758,14 @@ class DataFrame(ParentDataFrame):
                 messageParameters={"arg_name": "item", "arg_type": 
type(item).__name__},
             )
 
-    def _col(self, name: str) -> Column:
+    def _col(self, name: str, is_metadata_column: bool = False) -> Column:
         from pyspark.sql.connect.column import Column as ConnectColumn
 
         return ConnectColumn(
             ColumnReference(
                 unparsed_identifier=name,
                 plan_id=self._plan._plan_id,
+                is_metadata_column=is_metadata_column,
             )
         )
 
diff --git a/python/pyspark/sql/connect/expressions.py 
b/python/pyspark/sql/connect/expressions.py
index 413a69181683..c32db14968c6 100644
--- a/python/pyspark/sql/connect/expressions.py
+++ b/python/pyspark/sql/connect/expressions.py
@@ -524,7 +524,12 @@ class ColumnReference(Expression):
     treat it as an unresolved attribute. Attributes that have the same fully
     qualified name are identical"""
 
-    def __init__(self, unparsed_identifier: str, plan_id: Optional[int] = 
None) -> None:
+    def __init__(
+        self,
+        unparsed_identifier: str,
+        plan_id: Optional[int] = None,
+        is_metadata_column: bool = False,
+    ) -> None:
         super().__init__()
         assert isinstance(unparsed_identifier, str)
         self._unparsed_identifier = unparsed_identifier
@@ -532,6 +537,8 @@ class ColumnReference(Expression):
         assert plan_id is None or isinstance(plan_id, int)
         self._plan_id = plan_id
 
+        self._is_metadata_column = is_metadata_column
+
     def name(self) -> str:
         """Returns the qualified name of the column reference."""
         return self._unparsed_identifier
@@ -542,6 +549,7 @@ class ColumnReference(Expression):
         expr.unresolved_attribute.unparsed_identifier = 
self._unparsed_identifier
         if self._plan_id is not None:
             expr.unresolved_attribute.plan_id = self._plan_id
+        expr.unresolved_attribute.is_metadata_column = self._is_metadata_column
         return expr
 
     def __repr__(self) -> str:
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 394ac6bdb69c..f2c0bc815582 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2277,6 +2277,28 @@ class DataFrame:
         """
         ...
 
+    @dispatch_df_method
+    def metadataColumn(self, colName: str) -> Column:
+        """
+        Selects a metadata column based on its logical column name and returns 
it as a
+        :class:`Column`.
+
+        A metadata column can be accessed this way even if the underlying data 
source defines a data
+        column with a conflicting name.
+
+        .. versionadded:: 4.0.0
+
+        Parameters
+        ----------
+        colName : str
+            string, metadata column name
+
+        Returns
+        -------
+        :class:`Column`
+        """
+        ...
+
     @dispatch_df_method
     def colRegex(self, colName: str) -> Column:
         """
diff --git a/python/pyspark/sql/tests/test_dataframe.py 
b/python/pyspark/sql/tests/test_dataframe.py
index a0234a527f63..e85877cc87e0 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -1056,6 +1056,26 @@ class DataFrameTestsMixin:
             messageParameters={"reason": "Index column must be an atomic 
attribute"},
         )
 
+    def test_metadata_column(self):
+        with self.sql_conf(
+            {"spark.sql.catalog.testcat": 
"org.apache.spark.sql.connector.catalog.InMemoryCatalog"}
+        ):
+            tbl = "testcat.t"
+            with self.table(tbl):
+                self.spark.sql(
+                    f"""
+                    CREATE TABLE {tbl} (index bigint, data string)
+                    PARTITIONED BY (bucket(4, index), index)
+                    """
+                )
+                self.spark.sql(f"""INSERT INTO {tbl} VALUES (1, 'a'), (2, 
'b'), (3, 'c')""")
+
+                df = self.spark.sql(f"""SELECT * FROM {tbl}""")
+                assertDataFrameEqual(
+                    df.select(df.metadataColumn("index")),
+                    [Row(0), Row(0), Row(0)],
+                )
+
 
 class DataFrameTests(DataFrameTestsMixin, ReusedSQLTestCase):
     def test_query_execution_unsupported_in_classic(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to