This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e4d041232e7 [SPARK-40907][PS][SQL] PandasMode` should copy keys before 
inserting into Map
e4d041232e7 is described below

commit e4d041232e726a9c1b9dc8ab8dcdf114b34841d4
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Tue Oct 25 23:56:02 2022 +0900

    [SPARK-40907][PS][SQL] PandasMode` should copy keys before inserting into 
Map
    
    ### What changes were proposed in this pull request?
    Make `PandasMode` copy keys before inserting into Map
    
    ### Why are the changes needed?
    correctness issue similar to https://github.com/apache/spark/pull/38383, 
make it a separate PR since it is dedicated for Pandas API
    
    ```
    In [24]: def f(index, iterator): return ['3', '3', '3', '3', '4'] if index 
== 3 else ['0', '1', '2', '3', '4']
    
    In [25]: rdd = sc.parallelize([1, ], 4).mapPartitionsWithIndex(f)
    
    In [26]: df = spark.createDataFrame(rdd, schema='string')
    
    In [27]: psdf = df.pandas_api()
    
    In [28]: psdf.mode()
    Out[28]:
      value
    0     4
    
    In [29]: psdf._to_pandas().mode()
    Out[29]:
      value
    0     3
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    added UT
    
    Closes #38385 from zhengruifeng/ps_mode_fix.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/pandas/tests/test_dataframe.py               | 13 +++++++++++++
 .../spark/sql/catalyst/expressions/aggregate/Mode.scala     |  8 +++++---
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/pandas/tests/test_dataframe.py 
b/python/pyspark/pandas/tests/test_dataframe.py
index 166c18ba4e9..b5466b467d8 100644
--- a/python/pyspark/pandas/tests/test_dataframe.py
+++ b/python/pyspark/pandas/tests/test_dataframe.py
@@ -6044,6 +6044,19 @@ class DataFrameTest(ComparisonTestBase, SQLTestUtils):
         with self.assertRaises(ValueError):
             psdf.mode(axis=2)
 
+        def f(index, iterator):
+            return ["3", "3", "3", "3", "4"] if index == 3 else ["0", "1", 
"2", "3", "4"]
+
+        rdd = self.spark.sparkContext.parallelize(
+            [
+                1,
+            ],
+            4,
+        ).mapPartitionsWithIndex(f)
+        df = self.spark.createDataFrame(rdd, schema="string")
+        psdf = df.pandas_api()
+        self.assert_eq(psdf.mode(), psdf._to_pandas().mode())
+
     def test_abs(self):
         pdf = pd.DataFrame({"a": [-2, -1, 0, 1]})
         psdf = ps.from_pandas(pdf)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
index 51bfd2b9862..cd6e1a5a18e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala
@@ -121,10 +121,12 @@ case class PandasMode(
   override def update(
       buffer: OpenHashMap[AnyRef, Long],
       input: InternalRow): OpenHashMap[AnyRef, Long] = {
-    val key = child.eval(input).asInstanceOf[AnyRef]
+    val key = child.eval(input)
 
-    if (key != null || !ignoreNA) {
-      buffer.changeValue(key, 1L, _ + 1L)
+    if (key != null) {
+      buffer.changeValue(InternalRow.copyValue(key).asInstanceOf[AnyRef], 1L, 
_ + 1L)
+    } else if (!ignoreNA) {
+      buffer.changeValue(null, 1L, _ + 1L)
     }
     buffer
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to