This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 83167e56ff9 [SPARK-40542][PS][SQL] Make `ddof` in `DataFrame.std` and
`Series.std` accept arbitary integers
83167e56ff9 is described below
commit 83167e56ff9cdfeb29da81d07d56b482ccfedc74
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Sep 23 10:13:33 2022 +0800
[SPARK-40542][PS][SQL] Make `ddof` in `DataFrame.std` and `Series.std`
accept arbitary integers
### What changes were proposed in this pull request?
add a new `std` expression to support arbitary integeral `ddof`
### Why are the changes needed?
for API coverage
### Does this PR introduce _any_ user-facing change?
yes, it accept `ddof` other than {0, 1}
before
```
In [4]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3,
np.nan]}, columns=['a', 'b'])
In [5]: df.std(ddof=2)
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
Cell In [5], line 1
----> 1 df.std(ddof=2)
File ~/Dev/spark/python/pyspark/pandas/generic.py:1866, in Frame.std(self,
axis, skipna, ddof, numeric_only)
1803 def std(
1804 self,
1805 axis: Optional[Axis] = None,
(...)
1808 numeric_only: bool = None,
1809 ) -> Union[Scalar, "Series"]:
1810 """
1811 Return sample standard deviation.
1812
(...)
1864 0.816496580927726
1865 """
-> 1866 assert ddof in (0, 1)
1868 axis = validate_axis(axis)
1870 if numeric_only is None and axis == 0:
AssertionError:
```
after:
```
In [3]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3,
np.nan]}, columns=['a', 'b'])
In [4]: df.std(ddof=2)
Out[4]:
a 1.414214
b 0.141421
dtype: float64
In [5]: df.to_pandas().std(ddof=2)
/Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:975:
PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's
memory. It should only be used if the resulting pandas DataFrame is expected to
be small.
warnings.warn(message, PandasAPIOnSparkAdviceWarning)
Out[5]:
a 1.414214
b 0.141421
dtype: float64
```
### How was this patch tested?
added testsuites
Closes #37974 from zhengruifeng/ps_std_ddof.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/pandas/generic.py | 21 +++++++++++++++-----
python/pyspark/pandas/spark/functions.py | 5 +++++
.../pyspark/pandas/tests/test_generic_functions.py | 6 ++++++
python/pyspark/pandas/tests/test_stats.py | 6 ++++++
.../expressions/aggregate/CentralMomentAgg.scala | 23 ++++++++++++++++++++++
.../spark/sql/api/python/PythonSQLUtils.scala | 4 ++++
6 files changed, 60 insertions(+), 5 deletions(-)
diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index cafa37e3d9b..6ba967da7f5 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -1810,6 +1810,8 @@ class Frame(object, metaclass=ABCMeta):
"""
Return sample standard deviation.
+ .. versionadded:: 3.3.0
+
Parameters
----------
axis : {index (0), columns (1)}
@@ -1822,6 +1824,9 @@ class Frame(object, metaclass=ABCMeta):
ddof : int, default 1
Delta Degrees of Freedom. The divisor used in calculations is N -
ddof,
where N represents the number of elements.
+
+ .. versionchanged:: 3.4.0
+ Supported including arbitary integers.
numeric_only : bool, default None
Include only float, int, boolean columns. False is not supported.
This parameter
is mainly for pandas compatibility.
@@ -1843,6 +1848,11 @@ class Frame(object, metaclass=ABCMeta):
b 0.1
dtype: float64
+ >>> df.std(ddof=2)
+ a 1.414214
+ b 0.141421
+ dtype: float64
+
>>> df.std(axis=1)
0 0.636396
1 1.272792
@@ -1862,8 +1872,12 @@ class Frame(object, metaclass=ABCMeta):
>>> df['a'].std(ddof=0)
0.816496580927726
+
+ >>> df['a'].std(ddof=-1)
+ 0.707106...
"""
- assert ddof in (0, 1)
+ if not isinstance(ddof, int):
+ raise TypeError("ddof must be integer")
axis = validate_axis(axis)
@@ -1881,10 +1895,7 @@ class Frame(object, metaclass=ABCMeta):
spark_type_to_pandas_dtype(spark_type),
spark_type.simpleString()
)
)
- if ddof == 0:
- return F.stddev_pop(spark_column)
- else:
- return F.stddev_samp(spark_column)
+ return SF.stddev(spark_column, ddof)
return self._reduce_for_stat_function(
std, name="std", axis=axis, numeric_only=numeric_only, ddof=ddof,
skipna=skipna
diff --git a/python/pyspark/pandas/spark/functions.py
b/python/pyspark/pandas/spark/functions.py
index ed3a5ae430d..3aa9d9a37dd 100644
--- a/python/pyspark/pandas/spark/functions.py
+++ b/python/pyspark/pandas/spark/functions.py
@@ -27,6 +27,11 @@ from pyspark.sql.column import (
)
+def stddev(col: Column, ddof: int) -> Column:
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.PythonSQLUtils.pandasStddev(col._jc, ddof))
+
+
def skew(col: Column) -> Column:
sc = SparkContext._active_spark_context
return Column(sc._jvm.PythonSQLUtils.pandasSkewness(col._jc))
diff --git a/python/pyspark/pandas/tests/test_generic_functions.py
b/python/pyspark/pandas/tests/test_generic_functions.py
index 06f86c55a38..c8f6dc275da 100644
--- a/python/pyspark/pandas/tests/test_generic_functions.py
+++ b/python/pyspark/pandas/tests/test_generic_functions.py
@@ -166,6 +166,7 @@ class GenericFunctionsTest(PandasOnSparkTestCase,
TestUtils):
self._test_stat_functions(lambda x: x.max(skipna=False))
self._test_stat_functions(lambda x: x.std())
self._test_stat_functions(lambda x: x.std(skipna=False))
+ self._test_stat_functions(lambda x: x.std(ddof=2))
self._test_stat_functions(lambda x: x.sem())
self._test_stat_functions(lambda x: x.sem(skipna=False))
# self._test_stat_functions(lambda x: x.skew())
@@ -175,6 +176,11 @@ class GenericFunctionsTest(PandasOnSparkTestCase,
TestUtils):
pdf = pd.DataFrame({"a": [np.nan, np.nan, np.nan], "b": [1, np.nan,
2], "c": [1, 2, 3]})
psdf = ps.from_pandas(pdf)
+ with self.assertRaisesRegex(TypeError, "ddof must be integer"):
+ psdf.std(ddof="ddof")
+ with self.assertRaisesRegex(TypeError, "ddof must be integer"):
+ psdf.a.std(ddof="ddof")
+
self.assert_eq(pdf.a.median(), psdf.a.median())
self.assert_eq(pdf.a.median(skipna=False), psdf.a.median(skipna=False))
self.assert_eq(1.0, psdf.b.median())
diff --git a/python/pyspark/pandas/tests/test_stats.py
b/python/pyspark/pandas/tests/test_stats.py
index db3f7fd45fe..7a6a0d67494 100644
--- a/python/pyspark/pandas/tests/test_stats.py
+++ b/python/pyspark/pandas/tests/test_stats.py
@@ -450,6 +450,7 @@ class StatsTest(PandasOnSparkTestCase, SQLTestUtils):
self.assert_eq(psser.var(ddof=0), pser.var(ddof=0), almost=True)
self.assert_eq(psser.std(), pser.std(), almost=True)
self.assert_eq(psser.std(ddof=0), pser.std(ddof=0), almost=True)
+ self.assert_eq(psser.std(ddof=2), pser.std(ddof=2), almost=True)
self.assert_eq(psser.sem(), pser.sem(), almost=True)
self.assert_eq(psser.sem(ddof=0), pser.sem(ddof=0), almost=True)
@@ -488,6 +489,11 @@ class StatsTest(PandasOnSparkTestCase, SQLTestUtils):
pdf.std(ddof=0, numeric_only=True),
check_exact=False,
)
+ self.assert_eq(
+ psdf.std(ddof=2, numeric_only=True),
+ pdf.std(ddof=2, numeric_only=True),
+ check_exact=False,
+ )
self.assert_eq(psdf.sem(numeric_only=True),
pdf.sem(numeric_only=True), check_exact=False)
self.assert_eq(
psdf.sem(ddof=0, numeric_only=True),
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
index 8d9c221ff94..b830dd6c088 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
@@ -339,6 +339,29 @@ case class Kurtosis(
copy(child = newChild)
}
+/**
+ * Standard deviation in Pandas' fashion.
+ * This expression is dedicated only for Pandas API on Spark.
+ * Refer to pandas.core.nanops.nanstd.
+ */
+case class PandasStddev(
+ child: Expression,
+ ddof: Int)
+ extends CentralMomentAgg(child, true) {
+
+ override protected def momentOrder = 2
+
+ override val evaluateExpression: Expression = {
+ If(n === 0.0, Literal.create(null, DoubleType),
+ If(n === ddof, divideByZeroEvalResult, sqrt(m2 / (n - ddof))))
+ }
+
+ override def prettyName: String = "pandas_stddev"
+
+ override protected def withNewChildInternal(newChild: Expression):
PandasStddev =
+ copy(child = newChild)
+}
+
/**
* Skewness in Pandas' fashion. This expression is dedicated only for Pandas
API on Spark.
* Refer to pandas.core.nanops.nanskew.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index c495b145dc6..579a945398f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -155,6 +155,10 @@ private[sql] object PythonSQLUtils extends Logging {
Column(TimestampDiff(unit, start.expr, end.expr))
}
+ def pandasStddev(e: Column, ddof: Int): Column = {
+ Column(PandasStddev(e.expr, ddof).toAggregateExpression(false))
+ }
+
def pandasSkewness(e: Column): Column = {
Column(PandasSkewness(e.expr).toAggregateExpression(false))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]