This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cd1ec48b212 [SPARK-46082][PYTHON][CONNECT] Fix protobuf string
representation for Pandas Functions API with Spark Connect
cd1ec48b212 is described below
commit cd1ec48b212791c1326a818b6c17ff01398578f7
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Fri Nov 24 12:02:32 2023 +0900
[SPARK-46082][PYTHON][CONNECT] Fix protobuf string representation for
Pandas Functions API with Spark Connect
### What changes were proposed in this pull request?
This PR proposes to rename `_func` to `_functions` in the protobuf
instances for Pandas Functions API with Spark Connect so the string
presentation includes them (see also
https://github.com/apache/spark/pull/39223).
### Why are the changes needed?
In order to have the pretty string format for protobuf messages in Python
side.
### Does this PR introduce _any_ user-facing change?
Yes,
```bash
./bin/pyspark --remote local
```
```python
df = spark.range(1)
print(df.mapInPandas(lambda x: x, df.schema)._plan.print())
```
**Before:**
```
<MapPartitions is_barrier='False'>
<Range start='0', end='1', step='1', num_partitions='None'>
```
**After:**
```
<MapPartitions function='<lambda>(id)', is_barrier='False'>
<Range start='0', end='1', step='1', num_partitions='None'>
```
### How was this patch tested?
Manually tested as above.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43991 from HyukjinKwon/fix-print-proto.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/sql/connect/group.py | 2 --
python/pyspark/sql/connect/plan.py | 17 ++++++++---------
2 files changed, 8 insertions(+), 11 deletions(-)
diff --git a/python/pyspark/sql/connect/group.py
b/python/pyspark/sql/connect/group.py
index 481b7981a15..6f98186d9d9 100644
--- a/python/pyspark/sql/connect/group.py
+++ b/python/pyspark/sql/connect/group.py
@@ -378,7 +378,6 @@ class PandasCogroupedOps:
evalType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
)
- all_cols = self._extract_cols(self._gd1) +
self._extract_cols(self._gd2)
return DataFrame.withPlan(
plan.CoGroupMap(
input=self._gd1._df._plan,
@@ -386,7 +385,6 @@ class PandasCogroupedOps:
other=self._gd2._df._plan,
other_grouping_cols=self._gd2._grouping_cols,
function=udf_obj,
- cols=all_cols,
),
session=self._gd1._df._session,
)
diff --git a/python/pyspark/sql/connect/plan.py
b/python/pyspark/sql/connect/plan.py
index 7d63f8714a9..4b18914446d 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -2208,14 +2208,14 @@ class MapPartitions(LogicalPlan):
) -> None:
super().__init__(child)
- self._func = function._build_common_inline_user_defined_function(*cols)
+ self._function =
function._build_common_inline_user_defined_function(*cols)
self._is_barrier = is_barrier
def plan(self, session: "SparkConnectClient") -> proto.Relation:
assert self._child is not None
plan = self._create_proto_relation()
plan.map_partitions.input.CopyFrom(self._child.plan(session))
- plan.map_partitions.func.CopyFrom(self._func.to_plan_udf(session))
+ plan.map_partitions.func.CopyFrom(self._function.to_plan_udf(session))
plan.map_partitions.is_barrier = self._is_barrier
return plan
@@ -2234,7 +2234,7 @@ class GroupMap(LogicalPlan):
super().__init__(child)
self._grouping_cols = grouping_cols
- self._func = function._build_common_inline_user_defined_function(*cols)
+ self._function =
function._build_common_inline_user_defined_function(*cols)
def plan(self, session: "SparkConnectClient") -> proto.Relation:
assert self._child is not None
@@ -2243,7 +2243,7 @@ class GroupMap(LogicalPlan):
plan.group_map.grouping_expressions.extend(
[c.to_plan(session) for c in self._grouping_cols]
)
- plan.group_map.func.CopyFrom(self._func.to_plan_udf(session))
+ plan.group_map.func.CopyFrom(self._function.to_plan_udf(session))
return plan
@@ -2257,7 +2257,6 @@ class CoGroupMap(LogicalPlan):
other: Optional["LogicalPlan"],
other_grouping_cols: Sequence[Column],
function: "UserDefinedFunction",
- cols: List[Column],
):
assert isinstance(input_grouping_cols, list) and all(
isinstance(c, Column) for c in input_grouping_cols
@@ -2272,7 +2271,7 @@ class CoGroupMap(LogicalPlan):
self._other = cast(LogicalPlan, other)
# The function takes entire DataFrame as inputs, no need to do
# column binding (no input columns).
- self._func = function._build_common_inline_user_defined_function()
+ self._function = function._build_common_inline_user_defined_function()
def plan(self, session: "SparkConnectClient") -> proto.Relation:
assert self._child is not None
@@ -2285,7 +2284,7 @@ class CoGroupMap(LogicalPlan):
plan.co_group_map.other_grouping_expressions.extend(
[c.to_plan(session) for c in self._other_grouping_cols]
)
- plan.co_group_map.func.CopyFrom(self._func.to_plan_udf(session))
+ plan.co_group_map.func.CopyFrom(self._function.to_plan_udf(session))
return plan
@@ -2307,7 +2306,7 @@ class ApplyInPandasWithState(LogicalPlan):
super().__init__(child)
self._grouping_cols = grouping_cols
- self._func = function._build_common_inline_user_defined_function(*cols)
+ self._function =
function._build_common_inline_user_defined_function(*cols)
self._output_schema = output_schema
self._state_schema = state_schema
self._output_mode = output_mode
@@ -2320,7 +2319,7 @@ class ApplyInPandasWithState(LogicalPlan):
plan.apply_in_pandas_with_state.grouping_expressions.extend(
[c.to_plan(session) for c in self._grouping_cols]
)
-
plan.apply_in_pandas_with_state.func.CopyFrom(self._func.to_plan_udf(session))
+
plan.apply_in_pandas_with_state.func.CopyFrom(self._function.to_plan_udf(session))
plan.apply_in_pandas_with_state.output_schema = self._output_schema
plan.apply_in_pandas_with_state.state_schema = self._state_schema
plan.apply_in_pandas_with_state.output_mode = self._output_mode
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]