This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e638f6d20f25 [SPARK-50778][PYTHON] Add metadataColumn to PySpark
DataFrame
e638f6d20f25 is described below
commit e638f6d20f25cecce91ed907fc04294220abe23c
Author: Takuya Ueshin <[email protected]>
AuthorDate: Fri Jan 10 10:47:37 2025 +0800
[SPARK-50778][PYTHON] Add metadataColumn to PySpark DataFrame
### What changes were proposed in this pull request?
Add `metadataColumn` to PySpark DataFrame.
### Why are the changes needed?
Feature parity: The API is missing in PySpark.
### Does this PR introduce _any_ user-facing change?
Yes, the new API will be available.
### How was this patch tested?
Added the related test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49430 from ueshin/issues/SPARK-50778/metadata_column.
Authored-by: Takuya Ueshin <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/classic/dataframe.py | 9 +++++++++
python/pyspark/sql/connect/dataframe.py | 11 ++++++++++-
python/pyspark/sql/connect/expressions.py | 10 +++++++++-
python/pyspark/sql/dataframe.py | 22 ++++++++++++++++++++++
python/pyspark/sql/tests/test_dataframe.py | 20 ++++++++++++++++++++
5 files changed, 70 insertions(+), 2 deletions(-)
diff --git a/python/pyspark/sql/classic/dataframe.py
b/python/pyspark/sql/classic/dataframe.py
index 8beabda6c135..84498f1b2294 100644
--- a/python/pyspark/sql/classic/dataframe.py
+++ b/python/pyspark/sql/classic/dataframe.py
@@ -651,6 +651,15 @@ class DataFrame(ParentDataFrame, PandasMapOpsMixin,
PandasConversionMixin):
def columns(self) -> List[str]:
return [f.name for f in self.schema.fields]
+ def metadataColumn(self, colName: str) -> Column:
+ if not isinstance(colName, str):
+ raise PySparkTypeError(
+ errorClass="NOT_STR",
+ messageParameters={"arg_name": "colName", "arg_type":
type(colName).__name__},
+ )
+ jc = self._jdf.metadataColumn(colName)
+ return Column(jc)
+
def colRegex(self, colName: str) -> Column:
if not isinstance(colName, str):
raise PySparkTypeError(
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index 789292bdd56f..76b7881f234f 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -274,6 +274,14 @@ class DataFrame(ParentDataFrame):
res._cached_schema = self._cached_schema
return res
+ def metadataColumn(self, colName: str) -> Column:
+ if not isinstance(colName, str):
+ raise PySparkTypeError(
+ errorClass="NOT_STR",
+ messageParameters={"arg_name": "colName", "arg_type":
type(colName).__name__},
+ )
+ return self._col(colName, is_metadata_column=True)
+
def colRegex(self, colName: str) -> Column:
from pyspark.sql.connect.column import Column as ConnectColumn
@@ -1750,13 +1758,14 @@ class DataFrame(ParentDataFrame):
messageParameters={"arg_name": "item", "arg_type":
type(item).__name__},
)
- def _col(self, name: str) -> Column:
+ def _col(self, name: str, is_metadata_column: bool = False) -> Column:
from pyspark.sql.connect.column import Column as ConnectColumn
return ConnectColumn(
ColumnReference(
unparsed_identifier=name,
plan_id=self._plan._plan_id,
+ is_metadata_column=is_metadata_column,
)
)
diff --git a/python/pyspark/sql/connect/expressions.py
b/python/pyspark/sql/connect/expressions.py
index 413a69181683..c32db14968c6 100644
--- a/python/pyspark/sql/connect/expressions.py
+++ b/python/pyspark/sql/connect/expressions.py
@@ -524,7 +524,12 @@ class ColumnReference(Expression):
treat it as an unresolved attribute. Attributes that have the same fully
qualified name are identical"""
- def __init__(self, unparsed_identifier: str, plan_id: Optional[int] =
None) -> None:
+ def __init__(
+ self,
+ unparsed_identifier: str,
+ plan_id: Optional[int] = None,
+ is_metadata_column: bool = False,
+ ) -> None:
super().__init__()
assert isinstance(unparsed_identifier, str)
self._unparsed_identifier = unparsed_identifier
@@ -532,6 +537,8 @@ class ColumnReference(Expression):
assert plan_id is None or isinstance(plan_id, int)
self._plan_id = plan_id
+ self._is_metadata_column = is_metadata_column
+
def name(self) -> str:
"""Returns the qualified name of the column reference."""
return self._unparsed_identifier
@@ -542,6 +549,7 @@ class ColumnReference(Expression):
expr.unresolved_attribute.unparsed_identifier =
self._unparsed_identifier
if self._plan_id is not None:
expr.unresolved_attribute.plan_id = self._plan_id
+ expr.unresolved_attribute.is_metadata_column = self._is_metadata_column
return expr
def __repr__(self) -> str:
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 394ac6bdb69c..f2c0bc815582 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2277,6 +2277,28 @@ class DataFrame:
"""
...
+ @dispatch_df_method
+ def metadataColumn(self, colName: str) -> Column:
+ """
+ Selects a metadata column based on its logical column name and returns
it as a
+ :class:`Column`.
+
+ A metadata column can be accessed this way even if the underlying data
source defines a data
+ column with a conflicting name.
+
+ .. versionadded:: 4.0.0
+
+ Parameters
+ ----------
+ colName : str
+ string, metadata column name
+
+ Returns
+ -------
+ :class:`Column`
+ """
+ ...
+
@dispatch_df_method
def colRegex(self, colName: str) -> Column:
"""
diff --git a/python/pyspark/sql/tests/test_dataframe.py
b/python/pyspark/sql/tests/test_dataframe.py
index a0234a527f63..e85877cc87e0 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -1056,6 +1056,26 @@ class DataFrameTestsMixin:
messageParameters={"reason": "Index column must be an atomic
attribute"},
)
+ def test_metadata_column(self):
+ with self.sql_conf(
+ {"spark.sql.catalog.testcat":
"org.apache.spark.sql.connector.catalog.InMemoryCatalog"}
+ ):
+ tbl = "testcat.t"
+ with self.table(tbl):
+ self.spark.sql(
+ f"""
+ CREATE TABLE {tbl} (index bigint, data string)
+ PARTITIONED BY (bucket(4, index), index)
+ """
+ )
+ self.spark.sql(f"""INSERT INTO {tbl} VALUES (1, 'a'), (2,
'b'), (3, 'c')""")
+
+ df = self.spark.sql(f"""SELECT * FROM {tbl}""")
+ assertDataFrameEqual(
+ df.select(df.metadataColumn("index")),
+ [Row(0), Row(0), Row(0)],
+ )
+
class DataFrameTests(DataFrameTestsMixin, ReusedSQLTestCase):
def test_query_execution_unsupported_in_classic(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]