This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 7e0a5ef903c [SPARK-39962][PYTHON][SQL] Apply projection when group 
attributes are empty
7e0a5ef903c is described below

commit 7e0a5ef903c41eea3b0d1220bfabda2c8b8a5ac4
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Wed Aug 3 16:11:20 2022 +0900

    [SPARK-39962][PYTHON][SQL] Apply projection when group attributes are empty
    
    This PR proposes to apply the projection to respect the reordered columns 
in its child when group attributes are empty.
    
    To respect the column order in the child.
    
    Yes, it fixes a bug as below:
    
    ```python
    import pandas as pd
    from pyspark.sql import functions as f
    
    f.pandas_udf("double")
    def AVG(x: pd.Series) -> float:
        return x.mean()
    
    abc = spark.createDataFrame([(1.0, 5.0, 17.0)], schema=["a", "b", "c"])
    abc.agg(AVG("a"), AVG("c")).show()
    abc.select("c", "a").agg(AVG("a"), AVG("c")).show()
    ```
    
    **Before**
    
    ```
    +------+------+
    |AVG(a)|AVG(c)|
    +------+------+
    |  17.0|   1.0|
    +------+------+
    ```
    
    **After**
    
    ```
    +------+------+
    |AVG(a)|AVG(c)|
    +------+------+
    |   1.0|  17.0|
    +------+------+
    ```
    
    Manually tested, and added an unittest.
    
    Closes #37390 from HyukjinKwon/SPARK-39962.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 5335c784ae76c9cc0aaa7a4b57b3cd6b3891ad9a)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/sql/execution/python/AggregateInPandasExec.scala  |  5 +++--
 .../apache/spark/sql/execution/python/PythonUDFSuite.scala  | 13 +++++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
index dadf1129c34..791af2a6aee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
@@ -109,12 +109,13 @@ case class AggregateInPandasExec(
     inputRDD.mapPartitionsInternal { iter => if (iter.isEmpty) iter else {
       val prunedProj = UnsafeProjection.create(allInputs.toSeq, child.output)
 
-      val grouped = if (groupingExpressions.isEmpty) {
+      val groupedItr = if (groupingExpressions.isEmpty) {
         // Use an empty unsafe row as a place holder for the grouping key
         Iterator((new UnsafeRow(), iter))
       } else {
         GroupedIterator(iter, groupingExpressions, child.output)
-      }.map { case (key, rows) =>
+      }
+      val grouped = groupedItr.map { case (key, rows) =>
         (key, rows.map(prunedProj))
       }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
index 45b57207c57..4ad7f901053 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
@@ -71,4 +71,17 @@ class PythonUDFSuite extends QueryTest with 
SharedSparkSession {
         pythonTestUDF(count(pythonTestUDF(base("a") + 1))))
     checkAnswer(df1, df2)
   }
+
+  test("SPARK-39962: Global aggregation of Pandas UDF should respect the 
column order") {
+    assume(shouldTestGroupedAggPandasUDFs)
+    val df = Seq[(java.lang.Integer, java.lang.Integer)]((1, null)).toDF("a", 
"b")
+
+    val pandasTestUDF = TestGroupedAggPandasUDF(name = "pandas_udf")
+    val reorderedDf = df.select("b", "a")
+    val actual = reorderedDf.agg(
+      pandasTestUDF(reorderedDf("a")), pandasTestUDF(reorderedDf("b")))
+    val expected = df.agg(pandasTestUDF(df("a")), pandasTestUDF(df("b")))
+
+    checkAnswer(actual, expected)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to