This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e4d041232e7 [SPARK-40907][PS][SQL] PandasMode` should copy keys before inserting into Map e4d041232e7 is described below commit e4d041232e726a9c1b9dc8ab8dcdf114b34841d4 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Tue Oct 25 23:56:02 2022 +0900 [SPARK-40907][PS][SQL] PandasMode` should copy keys before inserting into Map ### What changes were proposed in this pull request? Make `PandasMode` copy keys before inserting into Map ### Why are the changes needed? correctness issue similar to https://github.com/apache/spark/pull/38383, make it a separate PR since it is dedicated for Pandas API ``` In [24]: def f(index, iterator): return ['3', '3', '3', '3', '4'] if index == 3 else ['0', '1', '2', '3', '4'] In [25]: rdd = sc.parallelize([1, ], 4).mapPartitionsWithIndex(f) In [26]: df = spark.createDataFrame(rdd, schema='string') In [27]: psdf = df.pandas_api() In [28]: psdf.mode() Out[28]: value 0 4 In [29]: psdf._to_pandas().mode() Out[29]: value 0 3 ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added UT Closes #38385 from zhengruifeng/ps_mode_fix. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/pandas/tests/test_dataframe.py | 13 +++++++++++++ .../spark/sql/catalyst/expressions/aggregate/Mode.scala | 8 +++++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/python/pyspark/pandas/tests/test_dataframe.py b/python/pyspark/pandas/tests/test_dataframe.py index 166c18ba4e9..b5466b467d8 100644 --- a/python/pyspark/pandas/tests/test_dataframe.py +++ b/python/pyspark/pandas/tests/test_dataframe.py @@ -6044,6 +6044,19 @@ class DataFrameTest(ComparisonTestBase, SQLTestUtils): with self.assertRaises(ValueError): psdf.mode(axis=2) + def f(index, iterator): + return ["3", "3", "3", "3", "4"] if index == 3 else ["0", "1", "2", "3", "4"] + + rdd = self.spark.sparkContext.parallelize( + [ + 1, + ], + 4, + ).mapPartitionsWithIndex(f) + df = self.spark.createDataFrame(rdd, schema="string") + psdf = df.pandas_api() + self.assert_eq(psdf.mode(), psdf._to_pandas().mode()) + def test_abs(self): pdf = pd.DataFrame({"a": [-2, -1, 0, 1]}) psdf = ps.from_pandas(pdf) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala index 51bfd2b9862..cd6e1a5a18e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala @@ -121,10 +121,12 @@ case class PandasMode( override def update( buffer: OpenHashMap[AnyRef, Long], input: InternalRow): OpenHashMap[AnyRef, Long] = { - val key = child.eval(input).asInstanceOf[AnyRef] + val key = child.eval(input) - if (key != null || !ignoreNA) { - buffer.changeValue(key, 1L, _ + 1L) + if (key != null) { + buffer.changeValue(InternalRow.copyValue(key).asInstanceOf[AnyRef], 1L, _ + 1L) + } else if (!ignoreNA) { + buffer.changeValue(null, 1L, _ + 1L) } buffer } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org