This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cd1ec48b212 [SPARK-46082][PYTHON][CONNECT] Fix protobuf string 
representation for Pandas Functions API with Spark Connect
cd1ec48b212 is described below

commit cd1ec48b212791c1326a818b6c17ff01398578f7
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Fri Nov 24 12:02:32 2023 +0900

    [SPARK-46082][PYTHON][CONNECT] Fix protobuf string representation for 
Pandas Functions API with Spark Connect
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to rename `_func` to `_functions` in the protobuf 
instances for Pandas Functions API with Spark Connect so the string 
presentation includes them (see also 
https://github.com/apache/spark/pull/39223).
    
    ### Why are the changes needed?
    
    In order to have the pretty string format for protobuf messages in Python 
side.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes,
    
    ```bash
    ./bin/pyspark --remote local
    ```
    
    ```python
    df = spark.range(1)
    print(df.mapInPandas(lambda x: x, df.schema)._plan.print())
    ```
    
    **Before:**
    ```
    <MapPartitions is_barrier='False'>
      <Range start='0', end='1', step='1', num_partitions='None'>
    ```
    
    **After:**
    
    ```
    <MapPartitions function='<lambda>(id)', is_barrier='False'>
      <Range start='0', end='1', step='1', num_partitions='None'>
    ```
    
    ### How was this patch tested?
    
    Manually tested as above.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43991 from HyukjinKwon/fix-print-proto.
    
    Authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 python/pyspark/sql/connect/group.py |  2 --
 python/pyspark/sql/connect/plan.py  | 17 ++++++++---------
 2 files changed, 8 insertions(+), 11 deletions(-)

diff --git a/python/pyspark/sql/connect/group.py 
b/python/pyspark/sql/connect/group.py
index 481b7981a15..6f98186d9d9 100644
--- a/python/pyspark/sql/connect/group.py
+++ b/python/pyspark/sql/connect/group.py
@@ -378,7 +378,6 @@ class PandasCogroupedOps:
             evalType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
         )
 
-        all_cols = self._extract_cols(self._gd1) + 
self._extract_cols(self._gd2)
         return DataFrame.withPlan(
             plan.CoGroupMap(
                 input=self._gd1._df._plan,
@@ -386,7 +385,6 @@ class PandasCogroupedOps:
                 other=self._gd2._df._plan,
                 other_grouping_cols=self._gd2._grouping_cols,
                 function=udf_obj,
-                cols=all_cols,
             ),
             session=self._gd1._df._session,
         )
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index 7d63f8714a9..4b18914446d 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -2208,14 +2208,14 @@ class MapPartitions(LogicalPlan):
     ) -> None:
         super().__init__(child)
 
-        self._func = function._build_common_inline_user_defined_function(*cols)
+        self._function = 
function._build_common_inline_user_defined_function(*cols)
         self._is_barrier = is_barrier
 
     def plan(self, session: "SparkConnectClient") -> proto.Relation:
         assert self._child is not None
         plan = self._create_proto_relation()
         plan.map_partitions.input.CopyFrom(self._child.plan(session))
-        plan.map_partitions.func.CopyFrom(self._func.to_plan_udf(session))
+        plan.map_partitions.func.CopyFrom(self._function.to_plan_udf(session))
         plan.map_partitions.is_barrier = self._is_barrier
         return plan
 
@@ -2234,7 +2234,7 @@ class GroupMap(LogicalPlan):
 
         super().__init__(child)
         self._grouping_cols = grouping_cols
-        self._func = function._build_common_inline_user_defined_function(*cols)
+        self._function = 
function._build_common_inline_user_defined_function(*cols)
 
     def plan(self, session: "SparkConnectClient") -> proto.Relation:
         assert self._child is not None
@@ -2243,7 +2243,7 @@ class GroupMap(LogicalPlan):
         plan.group_map.grouping_expressions.extend(
             [c.to_plan(session) for c in self._grouping_cols]
         )
-        plan.group_map.func.CopyFrom(self._func.to_plan_udf(session))
+        plan.group_map.func.CopyFrom(self._function.to_plan_udf(session))
         return plan
 
 
@@ -2257,7 +2257,6 @@ class CoGroupMap(LogicalPlan):
         other: Optional["LogicalPlan"],
         other_grouping_cols: Sequence[Column],
         function: "UserDefinedFunction",
-        cols: List[Column],
     ):
         assert isinstance(input_grouping_cols, list) and all(
             isinstance(c, Column) for c in input_grouping_cols
@@ -2272,7 +2271,7 @@ class CoGroupMap(LogicalPlan):
         self._other = cast(LogicalPlan, other)
         # The function takes entire DataFrame as inputs, no need to do
         # column binding (no input columns).
-        self._func = function._build_common_inline_user_defined_function()
+        self._function = function._build_common_inline_user_defined_function()
 
     def plan(self, session: "SparkConnectClient") -> proto.Relation:
         assert self._child is not None
@@ -2285,7 +2284,7 @@ class CoGroupMap(LogicalPlan):
         plan.co_group_map.other_grouping_expressions.extend(
             [c.to_plan(session) for c in self._other_grouping_cols]
         )
-        plan.co_group_map.func.CopyFrom(self._func.to_plan_udf(session))
+        plan.co_group_map.func.CopyFrom(self._function.to_plan_udf(session))
         return plan
 
 
@@ -2307,7 +2306,7 @@ class ApplyInPandasWithState(LogicalPlan):
 
         super().__init__(child)
         self._grouping_cols = grouping_cols
-        self._func = function._build_common_inline_user_defined_function(*cols)
+        self._function = 
function._build_common_inline_user_defined_function(*cols)
         self._output_schema = output_schema
         self._state_schema = state_schema
         self._output_mode = output_mode
@@ -2320,7 +2319,7 @@ class ApplyInPandasWithState(LogicalPlan):
         plan.apply_in_pandas_with_state.grouping_expressions.extend(
             [c.to_plan(session) for c in self._grouping_cols]
         )
-        
plan.apply_in_pandas_with_state.func.CopyFrom(self._func.to_plan_udf(session))
+        
plan.apply_in_pandas_with_state.func.CopyFrom(self._function.to_plan_udf(session))
         plan.apply_in_pandas_with_state.output_schema = self._output_schema
         plan.apply_in_pandas_with_state.state_schema = self._state_schema
         plan.apply_in_pandas_with_state.output_mode = self._output_mode


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to