This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a57f94b03e30 [SPARK-47638][PS][CONNECT] Skip column name validation in
PS
a57f94b03e30 is described below
commit a57f94b03e302e97c1d650a9f64596a82506df2f
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Mar 29 10:51:56 2024 +0800
[SPARK-47638][PS][CONNECT] Skip column name validation in PS
### What changes were proposed in this pull request?
Skip column name validation in PS
### Why are the changes needed?
`scol_for` is an internal method, not exposed to users, so this eager
validation seems unnecessary
when a bad column name is used
before: `scol_for` immediately fails
after: silent at `scol_for` call, fail later at analysis (e.g.
dtypes/schema) or execution
### Does this PR introduce _any_ user-facing change?
no, test only
### How was this patch tested?
ci
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #45752 from zhengruifeng/test_avoid_validation.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/pandas/utils.py | 5 ++++-
python/pyspark/sql/connect/dataframe.py | 24 +++++++++++-------------
2 files changed, 15 insertions(+), 14 deletions(-)
diff --git a/python/pyspark/pandas/utils.py b/python/pyspark/pandas/utils.py
index 57c1ddbe6ae3..0fe2944bcabe 100644
--- a/python/pyspark/pandas/utils.py
+++ b/python/pyspark/pandas/utils.py
@@ -608,7 +608,10 @@ def lazy_property(fn: Callable[[Any], Any]) -> property:
def scol_for(sdf: PySparkDataFrame, column_name: str) -> Column:
"""Return Spark Column for the given column name."""
- return sdf["`{}`".format(column_name)]
+ if is_remote():
+ return sdf._col("`{}`".format(column_name)) # type: ignore[operator]
+ else:
+ return sdf["`{}`".format(column_name)]
def column_labels_level(column_labels: List[Label]) -> int:
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index 672ac8b9c25c..b2d0cc5fedca 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -645,7 +645,7 @@ class DataFrame:
if isinstance(col, Column):
return col
else:
- return Column(ColumnReference(col, df._plan._plan_id))
+ return df._col(col)
return DataFrame(
plan.AsOfJoin(
@@ -1715,12 +1715,7 @@ class DataFrame:
error_class="ATTRIBUTE_NOT_SUPPORTED",
message_parameters={"attr_name": name}
)
- return Column(
- ColumnReference(
- unparsed_identifier=name,
- plan_id=self._plan._plan_id,
- )
- )
+ return self._col(name)
__getattr__.__doc__ = PySparkDataFrame.__getattr__.__doc__
@@ -1756,12 +1751,7 @@ class DataFrame:
if item not in self.columns:
self.select(item).isLocal()
- return Column(
- ColumnReference(
- unparsed_identifier=item,
- plan_id=self._plan._plan_id,
- )
- )
+ return self._col(item)
elif isinstance(item, Column):
return self.filter(item)
elif isinstance(item, (list, tuple)):
@@ -1774,6 +1764,14 @@ class DataFrame:
message_parameters={"arg_name": "item", "arg_type":
type(item).__name__},
)
+ def _col(self, name: str) -> Column:
+ return Column(
+ ColumnReference(
+ unparsed_identifier=name,
+ plan_id=self._plan._plan_id,
+ )
+ )
+
def __dir__(self) -> List[str]:
attrs = set(super().__dir__())
attrs.update(self.columns)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]